From 30eae01923fb003b22ca8ea5b287bd15ee56fb73 Mon Sep 17 00:00:00 2001 From: Ghais Zaher Date: Sat, 12 Aug 2023 12:30:33 +0200 Subject: [PATCH] feat: plan queue functionality --- cmd/server.go | 6 + runatlantis.io/docs/server-configuration.md | 8 + server/controllers/api_controller.go | 4 +- .../events/events_controller_e2e_test.go | 2 +- server/controllers/locks_controller.go | 51 +++- server/controllers/locks_controller_test.go | 67 ++++- server/controllers/templates/web_templates.go | 30 ++ server/core/db/boltdb.go | 196 +++++++++++-- server/core/db/boltdb_test.go | 271 +++++++++++++++--- server/core/locking/locking.go | 61 ++-- server/core/locking/locking_test.go | 34 ++- server/core/locking/mocks/mock_backend.go | 128 +++++++-- server/core/locking/mocks/mock_locker.go | 114 ++++++-- server/core/redis/redis.go | 191 ++++++++++-- server/core/redis/redis_test.go | 271 +++++++++++++++--- server/events/apply_command_runner_test.go | 2 +- server/events/command_runner_test.go | 28 +- server/events/delete_lock_command.go | 28 +- server/events/delete_lock_command_test.go | 30 +- .../events/mocks/mock_delete_lock_command.go | 28 +- server/events/models/models.go | 44 +++ server/events/models/models_test.go | 39 +++ server/events/plan_command_runner.go | 4 +- server/events/plan_command_runner_test.go | 6 +- server/events/project_locker.go | 28 +- server/events/project_locker_test.go | 86 +++++- server/events/pull_closed_executor.go | 25 +- server/events/pull_closed_executor_test.go | 18 +- server/events/unlock_command_runner.go | 43 ++- server/server.go | 7 +- server/static/css/custom.css | 6 +- server/user_config.go | 1 + 32 files changed, 1546 insertions(+), 311 deletions(-) diff --git a/cmd/server.go b/cmd/server.go index 869d46f3f2..c85e793a75 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -106,6 +106,7 @@ const ( StatsNamespace = "stats-namespace" AllowDraftPRs = "allow-draft-prs" PortFlag = "port" + QueueEnabled = "queue-enabled" RedisDB = "redis-db" RedisHost = "redis-host" RedisPassword = "redis-password" @@ -166,6 +167,7 @@ const ( DefaultRedisDB = 0 DefaultRedisPort = 6379 DefaultRedisTLSEnabled = false + DefaultQueueEnabled = false DefaultRedisInsecureSkipVerify = false DefaultTFDownloadURL = "https://releases.hashicorp.com" DefaultTFDownload = true @@ -482,6 +484,10 @@ var boolFlags = map[string]boolFlag{ description: "Exclude policy check comments from pull requests unless there's an actual error from conftest. This also excludes warnings.", defaultValue: false, }, + QueueEnabled: { + description: "Enable lock queue.", + defaultValue: DefaultQueueEnabled, + }, RedisTLSEnabled: { description: "Enable TLS on the connection to Redis with a min TLS version of 1.2", defaultValue: DefaultRedisTLSEnabled, diff --git a/runatlantis.io/docs/server-configuration.md b/runatlantis.io/docs/server-configuration.md index bca4e04da4..606298efd2 100644 --- a/runatlantis.io/docs/server-configuration.md +++ b/runatlantis.io/docs/server-configuration.md @@ -701,6 +701,14 @@ This is useful when you have many projects and want to keep the pull request cle ``` Exclude policy check comments from pull requests unless there's an actual error from conftest. This also excludes warnings. Defaults to `false`. +### `--queue-enabled` + ```bash + atlantis server --queue-enabled + # or + ATLANTIS_QUEUE_ENABLED=true + ``` + Enable lock queue. Defaults to `false`. + ### `--redis-host` ```bash atlantis server --redis-host="localhost" diff --git a/server/controllers/api_controller.go b/server/controllers/api_controller.go index 784120e982..afecdfa0bd 100644 --- a/server/controllers/api_controller.go +++ b/server/controllers/api_controller.go @@ -92,7 +92,7 @@ func (a *APIController) Plan(w http.ResponseWriter, r *http.Request) { a.apiReportError(w, http.StatusInternalServerError, err) return } - defer a.Locker.UnlockByPull(ctx.HeadRepo.FullName, 0) // nolint: errcheck + defer a.Locker.UnlockByPull(ctx.HeadRepo.FullName, 0, true) // nolint: errcheck if result.HasErrors() { code = http.StatusInternalServerError } @@ -121,7 +121,7 @@ func (a *APIController) Apply(w http.ResponseWriter, r *http.Request) { a.apiReportError(w, http.StatusInternalServerError, err) return } - defer a.Locker.UnlockByPull(ctx.HeadRepo.FullName, 0) // nolint: errcheck + defer a.Locker.UnlockByPull(ctx.HeadRepo.FullName, 0, true) // nolint: errcheck // We can now prepare and run the apply step result, err := a.apiApply(request, ctx) diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index b02af1d06a..d911f7a85c 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -1224,7 +1224,7 @@ func setupE2E(t *testing.T, repoDir string, opt setupOption) (events_controllers } terraformClient, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "", "default-tf-version", "https://releases.hashicorp.com", &NoopTFDownloader{}, true, false, projectCmdOutputHandler) Ok(t, err) - boltdb, err := db.New(dataDir) + boltdb, err := db.New(dataDir, false) Ok(t, err) backend := boltdb lockingClient := locking.NewClient(boltdb) diff --git a/server/controllers/locks_controller.go b/server/controllers/locks_controller.go index 3fce3f5ad6..875abf3d1a 100644 --- a/server/controllers/locks_controller.go +++ b/server/controllers/locks_controller.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "net/url" + "strings" "github.com/runatlantis/atlantis/server/controllers/templates" @@ -77,6 +78,10 @@ func (l *LocksController) GetLock(w http.ResponseWriter, r *http.Request) { return } + // get queues locks for this lock details page + var queue models.ProjectLockQueue + queue, _ = l.Locker.GetQueueByLock(lock.Project, lock.Workspace) + lockDetailQueue := GetQueueItemIndexData(queue) owner, repo := models.SplitRepoFullName(lock.Project.RepoFullName) viewData := templates.LockDetailData{ LockKeyEncoded: id, @@ -88,6 +93,7 @@ func (l *LocksController) GetLock(w http.ResponseWriter, r *http.Request) { CleanedBasePath: l.AtlantisURL.Path, RepoOwner: owner, RepoName: repo, + Queue: lockDetailQueue, } err = l.LockDetailTemplate.Execute(w, viewData) @@ -111,7 +117,7 @@ func (l *LocksController) DeleteLock(w http.ResponseWriter, r *http.Request) { return } - lock, err := l.DeleteLockCommand.DeleteLock(idUnencoded) + lock, dequeuedLock, err := l.DeleteLockCommand.DeleteLock(idUnencoded) if err != nil { l.respond(w, logging.Error, http.StatusInternalServerError, "deleting lock failed with: %s", err) return @@ -136,6 +142,10 @@ func (l *LocksController) DeleteLock(w http.ResponseWriter, r *http.Request) { if err = l.VCSClient.CreateComment(lock.Pull.BaseRepo, lock.Pull.Num, comment, ""); err != nil { l.Logger.Warn("failed commenting on pull request: %s", err) } + if dequeuedLock != nil { + l.Logger.Warn("dequeued lock: %s", dequeuedLock) + l.commentOnDequeuedPullRequests(*dequeuedLock) + } } else { l.Logger.Debug("skipping commenting on pull request and deleting workspace because BaseRepo field is empty") } @@ -150,3 +160,42 @@ func (l *LocksController) respond(w http.ResponseWriter, lvl logging.LogLevel, r w.WriteHeader(responseCode) fmt.Fprintln(w, response) } + +func (l *LocksController) commentOnDequeuedPullRequests(dequeuedLock models.ProjectLock) { + planVcsMessage := buildCommentOnDequeuedPullRequest([]models.ProjectLock{dequeuedLock}) + if commentErr := l.VCSClient.CreateComment(dequeuedLock.Pull.BaseRepo, dequeuedLock.Pull.Num, planVcsMessage, ""); commentErr != nil { + l.Logger.Err("unable to comment on PR %d: %s", dequeuedLock.Pull.Num, commentErr) + } +} + +func buildCommentOnDequeuedPullRequest(projectLocks []models.ProjectLock) string { + var releasedLocksMessages []string + for _, lock := range projectLocks { + releasedLocksMessages = append(releasedLocksMessages, fmt.Sprintf("* dir: `%s` workspace: `%s`", lock.Project.Path, lock.Workspace)) + } + + // stick to the first User for now, if needed, create a list of unique users and mention them all + lockCreatorMention := "@" + projectLocks[0].User.Username + releasedLocksMessage := strings.Join(releasedLocksMessages, "\n") + + return fmt.Sprintf("%s\nThe following locks have been aquired by this PR and can now be planned:\n%s", + lockCreatorMention, releasedLocksMessage) +} + +func GetQueueItemIndexData(q models.ProjectLockQueue) []templates.QueueItemIndexData { + var queueIndexDataList []templates.QueueItemIndexData + for _, projectLock := range q { + queueIndexDataList = append(queueIndexDataList, templates.QueueItemIndexData{ + LockPath: "Not yet acquired", + RepoFullName: projectLock.Project.RepoFullName, + PullNum: projectLock.Pull.Num, + Path: projectLock.Project.Path, + Workspace: projectLock.Workspace, + Time: projectLock.Time, + TimeFormatted: projectLock.Time.Format("02-01-2006 15:04:05"), + PullURL: projectLock.Pull.URL, + Author: projectLock.Pull.Author, + }) + } + return queueIndexDataList +} diff --git a/server/controllers/locks_controller_test.go b/server/controllers/locks_controller_test.go index 88e538d15d..ddbbb0361b 100644 --- a/server/controllers/locks_controller_test.go +++ b/server/controllers/locks_controller_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "reflect" "testing" "time" @@ -222,7 +223,7 @@ func TestDeleteLock_LockerErr(t *testing.T) { t.Log("If there is an error retrieving the lock, a 500 is returned") RegisterMockTestingT(t) dlc := mocks2.NewMockDeleteLockCommand() - When(dlc.DeleteLock("id")).ThenReturn(nil, errors.New("err")) + When(dlc.DeleteLock("id")).ThenReturn(nil, nil, errors.New("err")) lc := controllers.LocksController{ DeleteLockCommand: dlc, Logger: logging.NewNoopLogger(t), @@ -238,7 +239,7 @@ func TestDeleteLock_None(t *testing.T) { t.Log("If there is no lock at that ID we get a 404") RegisterMockTestingT(t) dlc := mocks2.NewMockDeleteLockCommand() - When(dlc.DeleteLock("id")).ThenReturn(nil, nil) + When(dlc.DeleteLock("id")).ThenReturn(nil, nil, nil) lc := controllers.LocksController{ DeleteLockCommand: dlc, Logger: logging.NewNoopLogger(t), @@ -255,7 +256,7 @@ func TestDeleteLock_OldFormat(t *testing.T) { RegisterMockTestingT(t) cp := vcsmocks.NewMockClient() dlc := mocks2.NewMockDeleteLockCommand() - When(dlc.DeleteLock("id")).ThenReturn(&models.ProjectLock{}, nil) + When(dlc.DeleteLock("id")).ThenReturn(&models.ProjectLock{}, nil, nil) lc := controllers.LocksController{ DeleteLockCommand: dlc, Logger: logging.NewNoopLogger(t), @@ -291,10 +292,10 @@ func TestDeleteLock_UpdateProjectStatus(t *testing.T) { Path: projectPath, RepoFullName: repoName, }, - }, nil) + }, nil, nil) var backend locking.Backend tmp := t.TempDir() - backend, err := db.New(tmp) + backend, err := db.New(tmp, false) Ok(t, err) // Seed the DB with a successful plan for that project (that is later discarded). _, err = backend.UpdatePullWithResults(pull, []command.ProjectResult{ @@ -342,13 +343,13 @@ func TestDeleteLock_CommentFailed(t *testing.T) { Pull: models.PullRequest{ BaseRepo: models.Repo{FullName: "owner/repo"}, }, - }, nil) + }, nil, nil) cp := vcsmocks.NewMockClient() workingDir := mocks2.NewMockWorkingDir() workingDirLocker := events.NewDefaultWorkingDirLocker() var backend locking.Backend tmp := t.TempDir() - backend, err := db.New(tmp) + backend, err := db.New(tmp, false) Ok(t, err) When(cp.CreateComment(Any[models.Repo](), Any[int](), Any[string](), Any[string]())).ThenReturn(errors.New("err")) lc := controllers.LocksController{ @@ -375,7 +376,7 @@ func TestDeleteLock_CommentSuccess(t *testing.T) { workingDirLocker := events.NewDefaultWorkingDirLocker() var backend locking.Backend tmp := t.TempDir() - backend, err := db.New(tmp) + backend, err := db.New(tmp, false) Ok(t, err) pull := models.PullRequest{ BaseRepo: models.Repo{FullName: "owner/repo"}, @@ -387,7 +388,7 @@ func TestDeleteLock_CommentSuccess(t *testing.T) { Path: "path", RepoFullName: "owner/repo", }, - }, nil) + }, nil, nil) lc := controllers.LocksController{ DeleteLockCommand: dlc, Logger: logging.NewNoopLogger(t), @@ -405,3 +406,51 @@ func TestDeleteLock_CommentSuccess(t *testing.T) { "**Warning**: The plan for dir: `path` workspace: `workspace` was **discarded** via the Atlantis UI.\n\n"+ "To `apply` this plan you must run `plan` again.", "") } + +func TestQueueItemIndexData(t *testing.T) { + layout := "2006-01-02T15:04:05.000Z" + strLockTime := "2020-09-01T00:45:26.371Z" + lockTime, _ := time.Parse(layout, strLockTime) + tests := []struct { + name string + queue models.ProjectLockQueue + want []templates.QueueItemIndexData + }{ + { + name: "empty list", + queue: models.ProjectLockQueue{}, + want: nil, + }, + { + name: "list with one item", + queue: models.ProjectLockQueue{ + { + Project: models.Project{RepoFullName: "org/repo", Path: "path"}, + Workspace: "workspace", + Time: lockTime, + Pull: models.PullRequest{Num: 15, Author: "pull-author", URL: "org/repo/pull/15"}, + }, + }, + want: []templates.QueueItemIndexData{ + { + LockPath: "Not yet acquired", + RepoFullName: "org/repo", + PullNum: 15, + Path: "path", + Workspace: "workspace", + Time: lockTime, + TimeFormatted: "01-09-2020 00:45:26", + PullURL: "org/repo/pull/15", + Author: "pull-author", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := controllers.GetQueueItemIndexData(tt.queue); !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetQueueItemIndexData() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/server/controllers/templates/web_templates.go b/server/controllers/templates/web_templates.go index 23c8835cf9..bef3f81557 100644 --- a/server/controllers/templates/web_templates.go +++ b/server/controllers/templates/web_templates.go @@ -39,6 +39,19 @@ type LockIndexData struct { LockedBy string Time time.Time TimeFormatted string + Queue []QueueItemIndexData +} + +type QueueItemIndexData struct { + LockPath string + RepoFullName string + PullNum int + Path string + Workspace string + Time time.Time + TimeFormatted string + PullURL string + Author string } // ApplyLockData holds the fields to display in the index view @@ -123,6 +136,7 @@ var IndexTemplate = template.Must(template.New("index.html.tmpl").Parse(` Locked By Date/Time Status + Queue {{ range .Locks }}
@@ -144,6 +158,9 @@ var IndexTemplate = template.Must(template.New("index.html.tmpl").Parse(` Locked + + {{ len .Queue }} +
{{ end }} @@ -275,6 +292,7 @@ type LockDetailData struct { // not using a path-based proxy, this will be an empty string. Never ends // in a '/' (hence "cleaned"). CleanedBasePath string + Queue []QueueItemIndexData } var LockTemplate = template.Must(template.New("lock.html.tmpl").Parse(` @@ -308,6 +326,18 @@ var LockTemplate = template.Must(template.New("lock.html.tmpl").Parse(`
Pull Request Link:
{{.PullRequestLink}}
Locked By:
{{.LockedBy}}
Workspace:
{{.Workspace}}
+ {{ if .Queue }} +
Queue:
+
+ {{ range .Queue }} +
+
Pull Request Link:
{{.PullURL}}
+
Author
{{.Author}}
+
Time:
{{.TimeFormatted}}
+
+ {{ end }} +
+ {{ end }}
Discard Plan & Unlock diff --git a/server/core/db/boltdb.go b/server/core/db/boltdb.go index 79e0798c29..515a6cbafa 100644 --- a/server/core/db/boltdb.go +++ b/server/core/db/boltdb.go @@ -20,12 +20,15 @@ import ( type BoltDB struct { db *bolt.DB locksBucketName []byte + queueBucketName []byte pullsBucketName []byte globalLocksBucketName []byte + queueEnabled bool } const ( locksBucketName = "runLocks" + queueBucketName = "queue" pullsBucketName = "pulls" globalLocksBucketName = "globalLocks" pullKeySeparator = "::" @@ -33,7 +36,7 @@ const ( // New returns a valid locker. We need to be able to write to dataDir // since bolt stores its data as a file -func New(dataDir string) (*BoltDB, error) { +func New(dataDir string, queueEnabled bool) (*BoltDB, error) { if err := os.MkdirAll(dataDir, 0700); err != nil { return nil, errors.Wrap(err, "creating data dir") } @@ -50,6 +53,9 @@ func New(dataDir string) (*BoltDB, error) { if _, err = tx.CreateBucketIfNotExists([]byte(locksBucketName)); err != nil { return errors.Wrapf(err, "creating bucket %q", locksBucketName) } + if _, err = tx.CreateBucketIfNotExists([]byte(queueBucketName)); err != nil { + return errors.Wrapf(err, "creating bucket %q", queueBucketName) + } if _, err = tx.CreateBucketIfNotExists([]byte(pullsBucketName)); err != nil { return errors.Wrapf(err, "creating bucket %q", pullsBucketName) } @@ -65,28 +71,33 @@ func New(dataDir string) (*BoltDB, error) { return &BoltDB{ db: db, locksBucketName: []byte(locksBucketName), + queueBucketName: []byte(queueBucketName), pullsBucketName: []byte(pullsBucketName), globalLocksBucketName: []byte(globalLocksBucketName), + queueEnabled: queueEnabled, }, nil } // NewWithDB is used for testing. -func NewWithDB(db *bolt.DB, bucket string, globalBucket string) (*BoltDB, error) { +func NewWithDB(db *bolt.DB, bucket string, globalBucket string, queueEnabled bool) (*BoltDB, error) { return &BoltDB{ db: db, locksBucketName: []byte(bucket), + queueBucketName: []byte(queueBucketName), pullsBucketName: []byte(pullsBucketName), globalLocksBucketName: []byte(globalBucket), + queueEnabled: queueEnabled, }, nil } // TryLock attempts to create a new lock. If the lock is // acquired, it will return true and the lock returned will be newLock. -// If the lock is not acquired, it will return false and the current -// lock that is preventing this lock from being acquired. -func (b *BoltDB) TryLock(newLock models.ProjectLock) (bool, models.ProjectLock, error) { +// If the lock is not acquired, it will return false, the current +// lock that is preventing this lock from being acquired, and the enqueue status. +func (b *BoltDB) TryLock(newLock models.ProjectLock) (bool, models.ProjectLock, *models.EnqueueStatus, error) { var lockAcquired bool var currLock models.ProjectLock + var enqueueStatus *models.EnqueueStatus key := b.lockKey(newLock.Project, newLock.Workspace) newLockSerialized, _ := json.Marshal(newLock) transactionErr := b.db.Update(func(tx *bolt.Tx) error { @@ -106,23 +117,86 @@ func (b *BoltDB) TryLock(newLock models.ProjectLock) (bool, models.ProjectLock, if err := json.Unmarshal(currLockSerialized, &currLock); err != nil { return errors.Wrap(err, "failed to deserialize current lock") } + + // checking if current lock is with the same PR + if currLock.Pull.Num == newLock.Pull.Num { + lockAcquired = true + return nil + } + lockAcquired = false + + if b.queueEnabled { + var err error + if enqueueStatus, err = b.enqueue(tx, key, newLock); err != nil { + return errors.Wrap(err, "") + } + } + return nil }) if transactionErr != nil { - return false, currLock, errors.Wrap(transactionErr, "DB transaction failed") + return false, currLock, enqueueStatus, errors.Wrap(transactionErr, "DB transaction failed") + } + + return lockAcquired, currLock, enqueueStatus, nil +} + +func (b *BoltDB) enqueue(tx *bolt.Tx, key string, newLock models.ProjectLock) (*models.EnqueueStatus, error) { + queueBucket := tx.Bucket(b.queueBucketName) + currQueueSerialized := queueBucket.Get([]byte(key)) + // Queue doesn't exist, create one + if currQueueSerialized == nil { + newQueue := models.ProjectLockQueue{newLock} + newQueueSerialized, err := json.Marshal(newQueue) + if err != nil { + return nil, errors.Wrap(err, "failed to serialize queue") + } + if err := queueBucket.Put([]byte(key), newQueueSerialized); err != nil { + return nil, errors.Wrapf(err, "failed to put queue with key %s", key) + } + return &models.EnqueueStatus{ + Status: models.Enqueued, + QueueDepth: 1, + }, nil + } + // Queue exists + var currQueue models.ProjectLockQueue + if err := json.Unmarshal(currQueueSerialized, &currQueue); err != nil { + return nil, errors.Wrap(err, "failed to deserialize queue for current lock") } - return lockAcquired, currLock, nil + // Lock is already in the queue + if indexInQueue := currQueue.FindPullRequest(newLock.Pull.Num); indexInQueue > -1 { + return &models.EnqueueStatus{ + Status: models.AlreadyInTheQueue, + QueueDepth: indexInQueue + 1, + }, nil + } + // Not in the queue, add it + newQueue := append(currQueue, newLock) + newQueueSerialized, err := json.Marshal(newQueue) + if err != nil { + return nil, errors.Wrap(err, "failed to serialize queue") + } + if err := queueBucket.Put([]byte(key), newQueueSerialized); err != nil { + return nil, errors.Wrapf(err, "failed to put queue with key %s", key) + } + return &models.EnqueueStatus{ + Status: models.Enqueued, + QueueDepth: len(newQueue), + }, nil } // Unlock attempts to unlock the project and workspace. // If there is no lock, then it will return a nil pointer. // If there is a lock, then it will delete it, and then return a pointer -// to the deleted lock. -func (b *BoltDB) Unlock(p models.Project, workspace string) (*models.ProjectLock, error) { +// to the deleted lock. If updateQueue is true, it will also grant the +// lock to the next PR in the queue, update the queue and return the dequeued lock. +func (b *BoltDB) Unlock(p models.Project, workspace string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) { var lock models.ProjectLock + var dequeuedLock *models.ProjectLock foundLock := false key := b.lockKey(p, workspace) err := b.db.Update(func(tx *bolt.Tx) error { @@ -134,13 +208,68 @@ func (b *BoltDB) Unlock(p models.Project, workspace string) (*models.ProjectLock } foundLock = true } - return bucket.Delete([]byte(key)) + + if err := bucket.Delete([]byte(key)); err != nil { + return errors.Wrap(err, "failed to delete key") + } + + // Dequeue next item + if b.queueEnabled && updateQueue { + var err error + dequeuedLock, err = b.dequeue(tx, key) + if err != nil { + return errors.Wrapf(err, "failed to dequeue key %s", key) + } + } + return nil }) + err = errors.Wrap(err, "DB transaction failed") if foundLock { - return &lock, err + return &lock, dequeuedLock, err } - return nil, err + return nil, nil, err +} + +func (b *BoltDB) dequeue(tx *bolt.Tx, key string) (*models.ProjectLock, error) { + queueBucket := tx.Bucket(b.queueBucketName) + currQueueSerialized := queueBucket.Get([]byte(key)) + + // Queue doesn't exist + if currQueueSerialized == nil { + return nil, nil + } + + // Queue exists + var currQueue models.ProjectLockQueue + if err := json.Unmarshal(currQueueSerialized, &currQueue); err != nil { + return nil, errors.Wrap(err, "failed to deserialize queue for current lock") + } + + dequeuedLock, newQueue := currQueue.Dequeue() + + // A lock was dequeued - update current lock holder + if dequeuedLock != nil { + dequeuedLockSerialized, err := json.Marshal(*dequeuedLock) + if err != nil { + return nil, errors.Wrap(err, "serializing") + } + locksBucket := tx.Bucket(b.locksBucketName) + if err := locksBucket.Put([]byte(key), dequeuedLockSerialized); err != nil { + return nil, errors.Wrap(err, "failed to give the lock to next PR in the queue") + } + } + + // New queue is empty and can be deleted + if len(newQueue) == 0 { + return dequeuedLock, queueBucket.Delete([]byte(key)) + } + + newQueueSerialized, err := json.Marshal(newQueue) + if err != nil { + return nil, errors.Wrap(err, "serializing") + } + return dequeuedLock, queueBucket.Put([]byte(key), newQueueSerialized) } // List lists all current locks. @@ -171,6 +300,30 @@ func (b *BoltDB) List() ([]models.ProjectLock, error) { return locks, nil } +func (b *BoltDB) GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) { + if !b.queueEnabled { + return nil, nil + } + var queue models.ProjectLockQueue + err := b.db.View(func(tx *bolt.Tx) error { + // construct lock key + key := b.lockKey(project, workspace) + queueBucket := tx.Bucket(b.queueBucketName) + currQueueSerialized := queueBucket.Get([]byte(key)) + if currQueueSerialized == nil { + return nil + } + if err := json.Unmarshal(currQueueSerialized, &queue); err != nil { + return errors.Wrapf(err, "failed to deserialize queue for lock %q", queue) + } + return nil + }) + if err != nil { + return queue, errors.Wrap(err, "DB transaction failed while fetching Queue") + } + return queue, nil +} + // LockCommand attempts to create a new lock for a CommandName. // If the lock doesn't exists, it will create a lock and return a pointer to it. // If the lock already exists, it will return an "lock already exists" error @@ -253,7 +406,7 @@ func (b *BoltDB) CheckCommandLock(cmdName command.Name) (*command.Lock, error) { } // UnlockByPull deletes all locks associated with that pull request and returns them. -func (b *BoltDB) UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) { +func (b *BoltDB) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) { var locks []models.ProjectLock err := b.db.View(func(tx *bolt.Tx) error { c := tx.Bucket(b.locksBucketName).Cursor() @@ -268,19 +421,28 @@ func (b *BoltDB) UnlockByPull(repoFullName string, pullNum int) ([]models.Projec locks = append(locks, lock) } } + return nil }) + if err != nil { - return locks, err + return locks, nil, err } + var dequeuedLocks = make([]models.ProjectLock, 0, len(locks)) + // delete the locks for _, lock := range locks { - if _, err = b.Unlock(lock.Project, lock.Workspace); err != nil { - return locks, errors.Wrapf(err, "unlocking repo %s, path %s, workspace %s", lock.Project.RepoFullName, lock.Project.Path, lock.Workspace) + var dequeuedLock *models.ProjectLock + if _, dequeuedLock, err = b.Unlock(lock.Project, lock.Workspace, updateQueue); err != nil { + return locks, nil, errors.Wrapf(err, "unlocking repo %s, path %s, workspace %s", lock.Project.RepoFullName, lock.Project.Path, lock.Workspace) + } + if dequeuedLock != nil { + dequeuedLocks = append(dequeuedLocks, *dequeuedLock) } } - return locks, nil + + return locks, &models.DequeueStatus{ProjectLocks: dequeuedLocks}, nil } // GetLock returns a pointer to the lock for that project and workspace. diff --git a/server/core/db/boltdb_test.go b/server/core/db/boltdb_test.go index e16a10d086..8a80446009 100644 --- a/server/core/db/boltdb_test.go +++ b/server/core/db/boltdb_test.go @@ -29,6 +29,7 @@ import ( var lockBucket = "bucket" var configBucket = "configBucket" +var queueBucket = "queue" var project = models.NewProject("owner/repo", "parent/child") var workspace = "default" var pullNum = 1 @@ -44,6 +45,88 @@ var lock = models.ProjectLock{ Time: time.Now(), } +func TestGetQueueByLock(t *testing.T) { + t.Log("Getting Queue By Lock") + db, b := newTestDBQueue(true) + defer cleanupDB(db) + + // queue doesn't exist -> should return nil + queue, err := b.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Assert(t, queue == nil, "exp nil") + + _, _, _, err = b.TryLock(lock) + Ok(t, err) + + lock1 := lock + lock1.Pull.Num = 2 + _, _, _, err = b.TryLock(lock1) // this lock should be queued + Ok(t, err) + + lock2 := lock + lock2.Pull.Num = 3 + _, _, _, err = b.TryLock(lock2) // this lock should be queued + Ok(t, err) + + queue, _ = b.GetQueueByLock(lock.Project, lock.Workspace) + Equals(t, 2, len(queue)) +} + +func TestSingleQueue(t *testing.T) { + t.Log("locking should return correct EnqueueStatus for a single queue") + db, b := newTestDBQueue(true) + defer cleanupDB(db) + + lockAcquired, _, _, err := b.TryLock(lock) + Ok(t, err) + Equals(t, true, lockAcquired) + + secondLock := lock + secondLock.Pull.Num = pullNum + 1 + lockAcquired, _, enqueueStatus, err := b.TryLock(secondLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, models.Enqueued, enqueueStatus.Status) + Equals(t, 1, enqueueStatus.QueueDepth) + + lockAcquired, _, enqueueStatus, err = b.TryLock(secondLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, models.AlreadyInTheQueue, enqueueStatus.Status) + Equals(t, 1, enqueueStatus.QueueDepth) + + thirdLock := lock + thirdLock.Pull.Num = pullNum + 2 + lockAcquired, _, enqueueStatus, err = b.TryLock(thirdLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, models.Enqueued, enqueueStatus.Status) + Equals(t, 2, enqueueStatus.QueueDepth) +} + +func TestMultipleQueues(t *testing.T) { + t.Log("locking should return correct EnqueueStatus for multiple queues") + db, b := newTestDBQueue(true) + defer cleanupDB(db) + + lockAcquired, _, _, err := b.TryLock(lock) + Ok(t, err) + Equals(t, true, lockAcquired) + + lockInDifferentWorkspace := lock + lockInDifferentWorkspace.Workspace = "different-workspace" + lockAcquired, _, _, err = b.TryLock(lockInDifferentWorkspace) + Ok(t, err) + Equals(t, true, lockAcquired) + + secondLock := lock + secondLock.Pull.Num = pullNum + 1 + lockAcquired, _, enqueueStatus, err := b.TryLock(secondLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, 1, enqueueStatus.QueueDepth) +} + func TestLockCommandNotSet(t *testing.T) { t.Log("retrieving apply lock when there are none should return empty LockCommand") db, b := newTestDB() @@ -113,7 +196,7 @@ func TestMixedLocksPresent(t *testing.T) { _, err := b.LockCommand(command.Apply, timeNow) Ok(t, err) - _, _, err = b.TryLock(lock) + _, _, _, err = b.TryLock(lock) Ok(t, err) ls, err := b.List() Ok(t, err) @@ -133,7 +216,7 @@ func TestListOneLock(t *testing.T) { t.Log("listing locks when there is one should return it") db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) ls, err := b.List() Ok(t, err) @@ -156,7 +239,7 @@ func TestListMultipleLocks(t *testing.T) { for _, r := range repos { newLock := lock newLock.Project = models.NewProject(r, "path") - _, _, err := b.TryLock(newLock) + _, _, _, err := b.TryLock(newLock) Ok(t, err) } ls, err := b.List() @@ -177,9 +260,9 @@ func TestListAddRemove(t *testing.T) { t.Log("listing after adding and removing should return none") db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) - _, err = b.Unlock(project, workspace) + _, _, err = b.Unlock(project, workspace, true) Ok(t, err) ls, err := b.List() @@ -191,7 +274,7 @@ func TestLockingNoLocks(t *testing.T) { t.Log("with no locks yet, lock should succeed") db, b := newTestDB() defer cleanupDB(db) - acquired, currLock, err := b.TryLock(lock) + acquired, currLock, _, err := b.TryLock(lock) Ok(t, err) Equals(t, true, acquired) Equals(t, lock, currLock) @@ -199,16 +282,16 @@ func TestLockingNoLocks(t *testing.T) { func TestLockingExistingLock(t *testing.T) { t.Log("if there is an existing lock, lock should...") - db, b := newTestDB() + db, b := newTestDBQueue(true) defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) t.Log("...succeed if the new project has a different path") { newLock := lock newLock.Project = models.NewProject(project.RepoFullName, "different/path") - acquired, currLock, err := b.TryLock(newLock) + acquired, currLock, _, err := b.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, pullNum, currLock.Pull.Num) @@ -218,7 +301,7 @@ func TestLockingExistingLock(t *testing.T) { { newLock := lock newLock.Workspace = "different-workspace" - acquired, currLock, err := b.TryLock(newLock) + acquired, currLock, _, err := b.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, newLock, currLock) @@ -228,20 +311,20 @@ func TestLockingExistingLock(t *testing.T) { { newLock := lock newLock.Project = models.NewProject("different/repo", project.Path) - acquired, currLock, err := b.TryLock(newLock) + acquired, currLock, _, err := b.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, newLock, currLock) } - t.Log("...not succeed if the new project only has a different pullNum") + t.Log("...succeed if the new project has a different pullNum, the locking attempt will be queued") { newLock := lock newLock.Pull.Num = lock.Pull.Num + 1 - acquired, currLock, err := b.TryLock(newLock) + acquired, _, enqueueStatus, err := b.TryLock(newLock) Ok(t, err) Equals(t, false, acquired) - Equals(t, currLock.Pull.Num, pullNum) + Equals(t, 1, enqueueStatus.QueueDepth) } } @@ -249,7 +332,7 @@ func TestUnlockingNoLocks(t *testing.T) { t.Log("unlocking with no locks should succeed") db, b := newTestDB() defer cleanupDB(db) - _, err := b.Unlock(project, workspace) + _, _, err := b.Unlock(project, workspace, true) Ok(t, err) } @@ -259,9 +342,9 @@ func TestUnlocking(t *testing.T) { db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) - _, err = b.Unlock(project, workspace) + _, _, err = b.Unlock(project, workspace, true) Ok(t, err) // should be no locks listed @@ -272,7 +355,7 @@ func TestUnlocking(t *testing.T) { // should be able to re-lock that repo with a new pull num newLock := lock newLock.Pull.Num = lock.Pull.Num + 1 - acquired, currLock, err := b.TryLock(newLock) + acquired, currLock, _, err := b.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, newLock, currLock) @@ -283,32 +366,32 @@ func TestUnlockingMultiple(t *testing.T) { db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) new := lock new.Project.RepoFullName = "new/repo" - _, _, err = b.TryLock(new) + _, _, _, err = b.TryLock(new) Ok(t, err) new2 := lock new2.Project.Path = "new/path" - _, _, err = b.TryLock(new2) + _, _, _, err = b.TryLock(new2) Ok(t, err) new3 := lock new3.Workspace = "new-workspace" - _, _, err = b.TryLock(new3) + _, _, _, err = b.TryLock(new3) Ok(t, err) // now try and unlock them - _, err = b.Unlock(new3.Project, new3.Workspace) + _, _, err = b.Unlock(new3.Project, new3.Workspace, true) Ok(t, err) - _, err = b.Unlock(new2.Project, workspace) + _, _, err = b.Unlock(new2.Project, workspace, true) Ok(t, err) - _, err = b.Unlock(new.Project, workspace) + _, _, err = b.Unlock(new.Project, workspace, true) Ok(t, err) - _, err = b.Unlock(project, workspace) + _, _, err = b.Unlock(project, workspace, true) Ok(t, err) // should be none left @@ -322,7 +405,7 @@ func TestUnlockByPullNone(t *testing.T) { db, b := newTestDB() defer cleanupDB(db) - _, err := b.UnlockByPull("any/repo", 1) + _, _, err := b.UnlockByPull("any/repo", 1, true) Ok(t, err) } @@ -330,12 +413,12 @@ func TestUnlockByPullOne(t *testing.T) { t.Log("with one lock, UnlockByPull should...") db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) t.Log("...delete nothing when its the same repo but a different pull") { - _, err := b.UnlockByPull(project.RepoFullName, pullNum+1) + _, _, err := b.UnlockByPull(project.RepoFullName, pullNum+1, true) Ok(t, err) ls, err := b.List() Ok(t, err) @@ -343,7 +426,7 @@ func TestUnlockByPullOne(t *testing.T) { } t.Log("...delete nothing when its the same pull but a different repo") { - _, err := b.UnlockByPull("different/repo", pullNum) + _, _, err := b.UnlockByPull("different/repo", pullNum, true) Ok(t, err) ls, err := b.List() Ok(t, err) @@ -351,7 +434,7 @@ func TestUnlockByPullOne(t *testing.T) { } t.Log("...delete the lock when its the same repo and pull") { - _, err := b.UnlockByPull(project.RepoFullName, pullNum) + _, _, err := b.UnlockByPull(project.RepoFullName, pullNum, true) Ok(t, err) ls, err := b.List() Ok(t, err) @@ -363,12 +446,12 @@ func TestUnlockByPullAfterUnlock(t *testing.T) { t.Log("after locking and unlocking, UnlockByPull should be successful") db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) - _, err = b.Unlock(project, workspace) + _, _, err = b.Unlock(project, workspace, true) Ok(t, err) - _, err = b.UnlockByPull(project.RepoFullName, pullNum) + _, _, err = b.UnlockByPull(project.RepoFullName, pullNum, true) Ok(t, err) ls, err := b.List() Ok(t, err) @@ -379,17 +462,17 @@ func TestUnlockByPullMatching(t *testing.T) { t.Log("UnlockByPull should delete all locks in that repo and pull num") db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) // add additional locks with the same repo and pull num but different paths/workspaces new := lock new.Project.Path = "dif/path" - _, _, err = b.TryLock(new) + _, _, _, err = b.TryLock(new) Ok(t, err) new2 := lock new2.Workspace = "new-workspace" - _, _, err = b.TryLock(new2) + _, _, _, err = b.TryLock(new2) Ok(t, err) // there should now be 3 @@ -398,13 +481,111 @@ func TestUnlockByPullMatching(t *testing.T) { Equals(t, 3, len(ls)) // should all be unlocked - _, err = b.UnlockByPull(project.RepoFullName, pullNum) + _, _, err = b.UnlockByPull(project.RepoFullName, pullNum, true) Ok(t, err) ls, err = b.List() Ok(t, err) Equals(t, 0, len(ls)) } +func TestDequeueAfterUnlock(t *testing.T) { + t.Log("unlocking should dequeue and grant lock to the next ProjectLock") + db, b := newTestDBQueue(true) + defer cleanupDB(db) + + // first lock acquired + _, _, _, err := b.TryLock(lock) + Ok(t, err) + + // second lock enqueued + new := lock + new.Pull.Num = pullNum + 1 + _, _, _, err = b.TryLock(new) + Ok(t, err) + + // third lock enqueued + new2 := lock + new2.Pull.Num = pullNum + 2 + _, _, _, err = b.TryLock(new2) + Ok(t, err) + queue, err := b.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Equals(t, 2, len(queue)) + Equals(t, new.Pull, queue[0].Pull) + Equals(t, new2.Pull, queue[1].Pull) + + // first lock unlocked -> second lock dequeued and lock acquired + _, dequeuedLock, err := b.Unlock(lock.Project, lock.Workspace, true) + Ok(t, err) + queue, err = b.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Equals(t, new, *dequeuedLock) + Equals(t, 1, len(queue)) + Equals(t, new2.Pull, queue[0].Pull) + + // second lock unlocked without touching the queue + _, dequeuedLock, err = b.Unlock(new.Project, new.Workspace, false) + Ok(t, err) + Assert(t, dequeuedLock == nil, "exp nil") + queue, err = b.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Equals(t, 1, len(queue)) + Equals(t, new2.Pull, queue[0].Pull) + + l, err := b.GetLock(project, workspace) + Ok(t, err) + Assert(t, l == nil, "exp nil") + + // bring the second lock again + _, _, _, err = b.TryLock(new) + Ok(t, err) + + // second lock unlocked -> third lock dequeued and lock acquired + _, dequeuedLock, err = b.Unlock(new.Project, new.Workspace, true) + Ok(t, err) + Equals(t, new2, *dequeuedLock) + + // Queue is deleted when empty + queue, err = b.GetQueueByLock(new2.Project, new2.Workspace) + Ok(t, err) + Assert(t, queue == nil, "exp nil") + + // third lock unlocked -> no more locks in the queue + _, dequeuedLock, err = b.Unlock(new2.Project, new2.Workspace, true) + Ok(t, err) + Equals(t, (*models.ProjectLock)(nil), dequeuedLock) +} + +func TestDequeueAfterUnlockByPull(t *testing.T) { + t.Log("unlocking by pull should dequeue and grant lock to all dequeued ProjectLocks") + db, b := newTestDBQueue(true) + defer cleanupDB(db) + + _, _, _, err := b.TryLock(lock) + Ok(t, err) + + lock2 := lock + lock2.Workspace = "different-workspace" + _, _, _, err = b.TryLock(lock2) + Ok(t, err) + + lock3 := lock + lock3.Pull.Num = pullNum + 1 + _, _, _, err = b.TryLock(lock3) + Ok(t, err) + + lock4 := lock + lock4.Workspace = "different-workspace" + lock4.Pull.Num = pullNum + 1 + _, _, _, err = b.TryLock(lock4) + Ok(t, err) + + _, dequeueStatus, err := b.UnlockByPull(project.RepoFullName, pullNum, true) + Ok(t, err) + + Equals(t, 2, len(dequeueStatus.ProjectLocks)) +} + func TestGetLockNotThere(t *testing.T) { t.Log("getting a lock that doesn't exist should return a nil pointer") db, b := newTestDB() @@ -418,7 +599,7 @@ func TestGetLock(t *testing.T) { t.Log("getting a lock should return the lock") db, b := newTestDB() defer cleanupDB(db) - _, _, err := b.TryLock(lock) + _, _, _, err := b.TryLock(lock) Ok(t, err) l, err := b.GetLock(project, workspace) @@ -877,6 +1058,11 @@ func TestPullStatus_UpdateMerge_ApprovePolicies(t *testing.T) { // newTestDB returns a TestDB using a temporary path. func newTestDB() (*bolt.DB, *db.BoltDB) { + return newTestDBQueue(false) +} + +// newTestDBQueue returns a TestDB using a temporary path with the option to enable queue. +func newTestDBQueue(queueEnabled bool) (*bolt.DB, *db.BoltDB) { // Retrieve a temporary path. f, err := os.CreateTemp("", "") if err != nil { @@ -897,17 +1083,20 @@ func newTestDB() (*bolt.DB, *db.BoltDB) { if _, err := tx.CreateBucketIfNotExists([]byte(configBucket)); err != nil { return errors.Wrap(err, "failed to create bucket") } + if _, err = tx.CreateBucketIfNotExists([]byte(queueBucket)); err != nil { + return errors.Wrapf(err, "failed to create bucket") + } return nil }); err != nil { panic(errors.Wrap(err, "could not create bucket")) } - b, _ := db.NewWithDB(boltDB, lockBucket, configBucket) + b, _ := db.NewWithDB(boltDB, lockBucket, configBucket, queueEnabled) return boltDB, b } func newTestDB2(t *testing.T) *db.BoltDB { tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) return boltDB } diff --git a/server/core/locking/locking.go b/server/core/locking/locking.go index 78676dbaf3..08e1dc7573 100644 --- a/server/core/locking/locking.go +++ b/server/core/locking/locking.go @@ -30,11 +30,14 @@ import ( // Backend is an implementation of the locking API we require. type Backend interface { - TryLock(lock models.ProjectLock) (bool, models.ProjectLock, error) - Unlock(project models.Project, workspace string) (*models.ProjectLock, error) + TryLock(lock models.ProjectLock) (bool, models.ProjectLock, *models.EnqueueStatus, error) + Unlock(project models.Project, workspace string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) List() ([]models.ProjectLock, error) GetLock(project models.Project, workspace string) (*models.ProjectLock, error) - UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) + UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) + + GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) + UpdateProjectStatus(pull models.PullRequest, workspace string, repoRelDir string, newStatus models.ProjectPlanStatus) error GetPullStatus(pull models.PullRequest) (*models.PullStatus, error) DeletePullStatus(pull models.PullRequest) error @@ -51,6 +54,8 @@ type TryLockResponse struct { LockAcquired bool // CurrLock is what project is currently holding the lock. CurrLock models.ProjectLock + // CurrLock is what project is currently holding the lock. + EnqueueStatus *models.EnqueueStatus // LockKey is an identified by which to lookup and delete this lock. LockKey string } @@ -64,9 +69,10 @@ type Client struct { type Locker interface { TryLock(p models.Project, workspace string, pull models.PullRequest, user models.User) (TryLockResponse, error) - Unlock(key string) (*models.ProjectLock, error) + Unlock(key string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) List() (map[string]models.ProjectLock, error) - UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) + GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) + UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) GetLock(key string) (*models.ProjectLock, error) } @@ -89,23 +95,24 @@ func (c *Client) TryLock(p models.Project, workspace string, pull models.PullReq User: user, Pull: pull, } - lockAcquired, currLock, err := c.backend.TryLock(lock) + lockAcquired, currLock, enqueueStatus, err := c.backend.TryLock(lock) if err != nil { return TryLockResponse{}, err } - return TryLockResponse{lockAcquired, currLock, c.key(p, workspace)}, nil + return TryLockResponse{lockAcquired, currLock, enqueueStatus, c.key(p, workspace)}, nil } // Unlock attempts to unlock a project and workspace. If successful, -// a pointer to the now deleted lock will be returned. Else, that -// pointer will be nil. An error will only be returned if there was -// an error deleting the lock (i.e. not if there was no lock). -func (c *Client) Unlock(key string) (*models.ProjectLock, error) { +// a pointer to the now deleted lock will be returned, as well as a pointer +// to the next dequeued lock from the queue (if any). Else, both +// pointers will be nil. An error will only be returned if there was +// an error deleting the lock or dequeuing (i.e. not if there was no lock). +func (c *Client) Unlock(key string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) { project, workspace, err := c.lockKeyToProjectWorkspace(key) if err != nil { - return nil, err + return nil, nil, err } - return c.backend.Unlock(project, workspace) + return c.backend.Unlock(project, workspace, updateQueue) } // List returns a map of all locks with their lock key as the map key. @@ -122,9 +129,14 @@ func (c *Client) List() (map[string]models.ProjectLock, error) { return m, nil } +func (c *Client) GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) { + return c.backend.GetQueueByLock(project, workspace) +} + +// TODO(Ghais) extend the tests // UnlockByPull deletes all locks associated with that pull request. -func (c *Client) UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) { - return c.backend.UnlockByPull(repoFullName, pullNum) +func (c *Client) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) { + return c.backend.UnlockByPull(repoFullName, pullNum, updateQueue) } // GetLock attempts to get the lock stored at key. If successful, @@ -167,15 +179,15 @@ func NewNoOpLocker() *NoOpLocker { // TryLock attempts to acquire a lock to a project and workspace. func (c *NoOpLocker) TryLock(p models.Project, workspace string, pull models.PullRequest, user models.User) (TryLockResponse, error) { - return TryLockResponse{true, models.ProjectLock{}, c.key(p, workspace)}, nil + return TryLockResponse{true, models.ProjectLock{}, nil, c.key(p, workspace)}, nil } // Unlock attempts to unlock a project and workspace. If successful, -// a pointer to the now deleted lock will be returned. Else, that -// pointer will be nil. An error will only be returned if there was +// pointers to the now deleted lock and the next dequeued lock (optional) will be returned. +// Else, both pointers will be nil. An error will only be returned if there was // an error deleting the lock (i.e. not if there was no lock). -func (c *NoOpLocker) Unlock(key string) (*models.ProjectLock, error) { - return &models.ProjectLock{}, nil +func (c *NoOpLocker) Unlock(key string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) { + return &models.ProjectLock{}, &models.ProjectLock{}, nil } // List returns a map of all locks with their lock key as the map key. @@ -185,9 +197,14 @@ func (c *NoOpLocker) List() (map[string]models.ProjectLock, error) { return m, nil } +func (c *NoOpLocker) GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) { + m := make(models.ProjectLockQueue, 0) + return m, nil +} + // UnlockByPull deletes all locks associated with that pull request. -func (c *NoOpLocker) UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) { - return []models.ProjectLock{}, nil +func (c *NoOpLocker) UnlockByPull(_ string, _ int, _ bool) ([]models.ProjectLock, *models.DequeueStatus, error) { + return []models.ProjectLock{}, nil, nil } // GetLock attempts to get the lock stored at key. If successful, diff --git a/server/core/locking/locking_test.go b/server/core/locking/locking_test.go index dd59454d4b..6f14d046ff 100644 --- a/server/core/locking/locking_test.go +++ b/server/core/locking/locking_test.go @@ -39,7 +39,7 @@ var pl = models.ProjectLock{Project: project, Pull: pull, User: user, Workspace: func TestTryLock_Err(t *testing.T) { RegisterMockTestingT(t) backend := mocks.NewMockBackend() - When(backend.TryLock(Any[models.ProjectLock]())).ThenReturn(false, models.ProjectLock{}, errExpected) + When(backend.TryLock(Any[models.ProjectLock]())).ThenReturn(false, models.ProjectLock{}, nil, errExpected) t.Log("when the backend returns an error, TryLock should return that error") l := locking.NewClient(backend) _, err := l.TryLock(project, workspace, pull, user) @@ -50,7 +50,7 @@ func TestTryLock_Success(t *testing.T) { RegisterMockTestingT(t) currLock := models.ProjectLock{} backend := mocks.NewMockBackend() - When(backend.TryLock(Any[models.ProjectLock]())).ThenReturn(true, currLock, nil) + When(backend.TryLock(Any[models.ProjectLock]())).ThenReturn(true, currLock, nil, nil) l := locking.NewClient(backend) r, err := l.TryLock(project, workspace, pull, user) Ok(t, err) @@ -62,7 +62,7 @@ func TestUnlock_InvalidKey(t *testing.T) { backend := mocks.NewMockBackend() l := locking.NewClient(backend) - _, err := l.Unlock("invalidkey") + _, _, err := l.Unlock("invalidkey", true) Assert(t, err != nil, "expected err") Assert(t, strings.Contains(err.Error(), "invalid key format"), "expected err") } @@ -70,19 +70,29 @@ func TestUnlock_InvalidKey(t *testing.T) { func TestUnlock_Err(t *testing.T) { RegisterMockTestingT(t) backend := mocks.NewMockBackend() - When(backend.Unlock(Any[models.Project](), Any[string]())).ThenReturn(nil, errExpected) + When(backend.Unlock(Any[models.Project](), Any[string](), Any[bool]())).ThenReturn(nil, nil, errExpected) l := locking.NewClient(backend) - _, err := l.Unlock("owner/repo/path/workspace") + _, _, err := l.Unlock("owner/repo/path/workspace", false) Equals(t, err, err) - backend.VerifyWasCalledOnce().Unlock(project, "workspace") + backend.VerifyWasCalledOnce().Unlock(project, "workspace", false) } func TestUnlock(t *testing.T) { RegisterMockTestingT(t) backend := mocks.NewMockBackend() - When(backend.Unlock(Any[models.Project](), Any[string]())).ThenReturn(&pl, nil) + When(backend.Unlock(Any[models.Project](), Any[string](), Eq(false))).ThenReturn(&pl, nil, nil) l := locking.NewClient(backend) - lock, err := l.Unlock("owner/repo/path/workspace") + lock, _, err := l.Unlock("owner/repo/path/workspace", false) + Ok(t, err) + Equals(t, &pl, lock) +} + +func TestUnlock_UpdateQueue(t *testing.T) { + RegisterMockTestingT(t) + backend := mocks.NewMockBackend() + When(backend.Unlock(Any[models.Project](), Any[string](), Eq(true))).ThenReturn(&pl, nil, nil) + l := locking.NewClient(backend) + lock, _, err := l.Unlock("owner/repo/path/workspace", true) Ok(t, err) Equals(t, &pl, lock) } @@ -111,9 +121,9 @@ func TestList(t *testing.T) { func TestUnlockByPull(t *testing.T) { RegisterMockTestingT(t) backend := mocks.NewMockBackend() - When(backend.UnlockByPull("owner/repo", 1)).ThenReturn(nil, errExpected) + When(backend.UnlockByPull("owner/repo", 1, true)).ThenReturn(nil, nil, errExpected) l := locking.NewClient(backend) - _, err := l.UnlockByPull("owner/repo", 1) + _, _, err := l.UnlockByPull("owner/repo", 1, true) Equals(t, errExpected, err) } @@ -156,7 +166,7 @@ func TestTryLock_NoOpLocker(t *testing.T) { func TestUnlock_NoOpLocker(t *testing.T) { l := locking.NewNoOpLocker() - lock, err := l.Unlock("owner/repo/path/workspace") + lock, _, err := l.Unlock("owner/repo/path/workspace", true) Ok(t, err) Equals(t, &models.ProjectLock{}, lock) } @@ -170,7 +180,7 @@ func TestList_NoOpLocker(t *testing.T) { func TestUnlockByPull_NoOpLocker(t *testing.T) { l := locking.NewNoOpLocker() - _, err := l.UnlockByPull("owner/repo", 1) + _, _, err := l.UnlockByPull("owner/repo", 1, true) Ok(t, err) } diff --git a/server/core/locking/mocks/mock_backend.go b/server/core/locking/mocks/mock_backend.go index f2174476a1..a6a05da8b5 100644 --- a/server/core/locking/mocks/mock_backend.go +++ b/server/core/locking/mocks/mock_backend.go @@ -98,6 +98,25 @@ func (mock *MockBackend) GetPullStatus(pull models.PullRequest) (*models.PullSta return ret0, ret1 } +func (mock *MockBackend) GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockBackend().") + } + params := []pegomock.Param{project, workspace} + result := pegomock.GetGenericMockFrom(mock).Invoke("GetQueueByLock", params, []reflect.Type{reflect.TypeOf((*models.ProjectLockQueue)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 models.ProjectLockQueue + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(models.ProjectLockQueue) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + func (mock *MockBackend) List() ([]models.ProjectLock, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockBackend().") @@ -136,15 +155,16 @@ func (mock *MockBackend) LockCommand(cmdName command.Name, lockTime time.Time) ( return ret0, ret1 } -func (mock *MockBackend) TryLock(lock models.ProjectLock) (bool, models.ProjectLock, error) { +func (mock *MockBackend) TryLock(lock models.ProjectLock) (bool, models.ProjectLock, *models.EnqueueStatus, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockBackend().") } params := []pegomock.Param{lock} - result := pegomock.GetGenericMockFrom(mock).Invoke("TryLock", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem(), reflect.TypeOf((*models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + result := pegomock.GetGenericMockFrom(mock).Invoke("TryLock", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem(), reflect.TypeOf((*models.ProjectLock)(nil)).Elem(), reflect.TypeOf((**models.EnqueueStatus)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 bool var ret1 models.ProjectLock - var ret2 error + var ret2 *models.EnqueueStatus + var ret3 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].(bool) @@ -153,48 +173,59 @@ func (mock *MockBackend) TryLock(lock models.ProjectLock) (bool, models.ProjectL ret1 = result[1].(models.ProjectLock) } if result[2] != nil { - ret2 = result[2].(error) + ret2 = result[2].(*models.EnqueueStatus) + } + if result[3] != nil { + ret3 = result[3].(error) } } - return ret0, ret1, ret2 + return ret0, ret1, ret2, ret3 } -func (mock *MockBackend) Unlock(project models.Project, workspace string) (*models.ProjectLock, error) { +func (mock *MockBackend) Unlock(project models.Project, workspace string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockBackend().") } - params := []pegomock.Param{project, workspace} - result := pegomock.GetGenericMockFrom(mock).Invoke("Unlock", params, []reflect.Type{reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + params := []pegomock.Param{project, workspace, updateQueue} + result := pegomock.GetGenericMockFrom(mock).Invoke("Unlock", params, []reflect.Type{reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 *models.ProjectLock - var ret1 error + var ret1 *models.ProjectLock + var ret2 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].(*models.ProjectLock) } if result[1] != nil { - ret1 = result[1].(error) + ret1 = result[1].(*models.ProjectLock) + } + if result[2] != nil { + ret2 = result[2].(error) } } - return ret0, ret1 + return ret0, ret1, ret2 } -func (mock *MockBackend) UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) { +func (mock *MockBackend) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockBackend().") } - params := []pegomock.Param{repoFullName, pullNum} - result := pegomock.GetGenericMockFrom(mock).Invoke("UnlockByPull", params, []reflect.Type{reflect.TypeOf((*[]models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + params := []pegomock.Param{repoFullName, pullNum, updateQueue} + result := pegomock.GetGenericMockFrom(mock).Invoke("UnlockByPull", params, []reflect.Type{reflect.TypeOf((*[]models.ProjectLock)(nil)).Elem(), reflect.TypeOf((**models.DequeueStatus)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 []models.ProjectLock - var ret1 error + var ret1 *models.DequeueStatus + var ret2 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].([]models.ProjectLock) } if result[1] != nil { - ret1 = result[1].(error) + ret1 = result[1].(*models.DequeueStatus) + } + if result[2] != nil { + ret2 = result[2].(error) } } - return ret0, ret1 + return ret0, ret1, ret2 } func (mock *MockBackend) UnlockCommand(cmdName command.Name) error { @@ -395,6 +426,37 @@ func (c *MockBackend_GetPullStatus_OngoingVerification) GetAllCapturedArguments( return } +func (verifier *VerifierMockBackend) GetQueueByLock(project models.Project, workspace string) *MockBackend_GetQueueByLock_OngoingVerification { + params := []pegomock.Param{project, workspace} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "GetQueueByLock", params, verifier.timeout) + return &MockBackend_GetQueueByLock_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockBackend_GetQueueByLock_OngoingVerification struct { + mock *MockBackend + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockBackend_GetQueueByLock_OngoingVerification) GetCapturedArguments() (models.Project, string) { + project, workspace := c.GetAllCapturedArguments() + return project[len(project)-1], workspace[len(workspace)-1] +} + +func (c *MockBackend_GetQueueByLock_OngoingVerification) GetAllCapturedArguments() (_param0 []models.Project, _param1 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.Project, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.Project) + } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } + } + return +} + func (verifier *VerifierMockBackend) List() *MockBackend_List_OngoingVerification { params := []pegomock.Param{} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "List", params, verifier.timeout) @@ -470,8 +532,8 @@ func (c *MockBackend_TryLock_OngoingVerification) GetAllCapturedArguments() (_pa return } -func (verifier *VerifierMockBackend) Unlock(project models.Project, workspace string) *MockBackend_Unlock_OngoingVerification { - params := []pegomock.Param{project, workspace} +func (verifier *VerifierMockBackend) Unlock(project models.Project, workspace string, updateQueue bool) *MockBackend_Unlock_OngoingVerification { + params := []pegomock.Param{project, workspace, updateQueue} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Unlock", params, verifier.timeout) return &MockBackend_Unlock_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -481,12 +543,12 @@ type MockBackend_Unlock_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockBackend_Unlock_OngoingVerification) GetCapturedArguments() (models.Project, string) { - project, workspace := c.GetAllCapturedArguments() - return project[len(project)-1], workspace[len(workspace)-1] +func (c *MockBackend_Unlock_OngoingVerification) GetCapturedArguments() (models.Project, string, bool) { + project, workspace, updateQueue := c.GetAllCapturedArguments() + return project[len(project)-1], workspace[len(workspace)-1], updateQueue[len(updateQueue)-1] } -func (c *MockBackend_Unlock_OngoingVerification) GetAllCapturedArguments() (_param0 []models.Project, _param1 []string) { +func (c *MockBackend_Unlock_OngoingVerification) GetAllCapturedArguments() (_param0 []models.Project, _param1 []string, _param2 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.Project, len(c.methodInvocations)) @@ -497,12 +559,16 @@ func (c *MockBackend_Unlock_OngoingVerification) GetAllCapturedArguments() (_par for u, param := range params[1] { _param1[u] = param.(string) } + _param2 = make([]bool, len(c.methodInvocations)) + for u, param := range params[2] { + _param2[u] = param.(bool) + } } return } -func (verifier *VerifierMockBackend) UnlockByPull(repoFullName string, pullNum int) *MockBackend_UnlockByPull_OngoingVerification { - params := []pegomock.Param{repoFullName, pullNum} +func (verifier *VerifierMockBackend) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) *MockBackend_UnlockByPull_OngoingVerification { + params := []pegomock.Param{repoFullName, pullNum, updateQueue} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "UnlockByPull", params, verifier.timeout) return &MockBackend_UnlockByPull_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -512,12 +578,12 @@ type MockBackend_UnlockByPull_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockBackend_UnlockByPull_OngoingVerification) GetCapturedArguments() (string, int) { - repoFullName, pullNum := c.GetAllCapturedArguments() - return repoFullName[len(repoFullName)-1], pullNum[len(pullNum)-1] +func (c *MockBackend_UnlockByPull_OngoingVerification) GetCapturedArguments() (string, int, bool) { + repoFullName, pullNum, updateQueue := c.GetAllCapturedArguments() + return repoFullName[len(repoFullName)-1], pullNum[len(pullNum)-1], updateQueue[len(updateQueue)-1] } -func (c *MockBackend_UnlockByPull_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []int) { +func (c *MockBackend_UnlockByPull_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []int, _param2 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) @@ -528,6 +594,10 @@ func (c *MockBackend_UnlockByPull_OngoingVerification) GetAllCapturedArguments() for u, param := range params[1] { _param1[u] = param.(int) } + _param2 = make([]bool, len(c.methodInvocations)) + for u, param := range params[2] { + _param2[u] = param.(bool) + } } return } diff --git a/server/core/locking/mocks/mock_locker.go b/server/core/locking/mocks/mock_locker.go index 1efcb2c64b..3781f2d8fd 100644 --- a/server/core/locking/mocks/mock_locker.go +++ b/server/core/locking/mocks/mock_locker.go @@ -45,6 +45,25 @@ func (mock *MockLocker) GetLock(key string) (*models.ProjectLock, error) { return ret0, ret1 } +func (mock *MockLocker) GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockLocker().") + } + params := []pegomock.Param{project, workspace} + result := pegomock.GetGenericMockFrom(mock).Invoke("GetQueueByLock", params, []reflect.Type{reflect.TypeOf((*models.ProjectLockQueue)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 models.ProjectLockQueue + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(models.ProjectLockQueue) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + func (mock *MockLocker) List() (map[string]models.ProjectLock, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockLocker().") @@ -83,42 +102,50 @@ func (mock *MockLocker) TryLock(p models.Project, workspace string, pull models. return ret0, ret1 } -func (mock *MockLocker) Unlock(key string) (*models.ProjectLock, error) { +func (mock *MockLocker) Unlock(key string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockLocker().") } - params := []pegomock.Param{key} - result := pegomock.GetGenericMockFrom(mock).Invoke("Unlock", params, []reflect.Type{reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + params := []pegomock.Param{key, updateQueue} + result := pegomock.GetGenericMockFrom(mock).Invoke("Unlock", params, []reflect.Type{reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 *models.ProjectLock - var ret1 error + var ret1 *models.ProjectLock + var ret2 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].(*models.ProjectLock) } if result[1] != nil { - ret1 = result[1].(error) + ret1 = result[1].(*models.ProjectLock) + } + if result[2] != nil { + ret2 = result[2].(error) } } - return ret0, ret1 + return ret0, ret1, ret2 } -func (mock *MockLocker) UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) { +func (mock *MockLocker) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockLocker().") } - params := []pegomock.Param{repoFullName, pullNum} - result := pegomock.GetGenericMockFrom(mock).Invoke("UnlockByPull", params, []reflect.Type{reflect.TypeOf((*[]models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + params := []pegomock.Param{repoFullName, pullNum, updateQueue} + result := pegomock.GetGenericMockFrom(mock).Invoke("UnlockByPull", params, []reflect.Type{reflect.TypeOf((*[]models.ProjectLock)(nil)).Elem(), reflect.TypeOf((**models.DequeueStatus)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 []models.ProjectLock - var ret1 error + var ret1 *models.DequeueStatus + var ret2 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].([]models.ProjectLock) } if result[1] != nil { - ret1 = result[1].(error) + ret1 = result[1].(*models.DequeueStatus) + } + if result[2] != nil { + ret2 = result[2].(error) } } - return ret0, ret1 + return ret0, ret1, ret2 } func (mock *MockLocker) VerifyWasCalledOnce() *VerifierMockLocker { @@ -185,6 +212,37 @@ func (c *MockLocker_GetLock_OngoingVerification) GetAllCapturedArguments() (_par return } +func (verifier *VerifierMockLocker) GetQueueByLock(project models.Project, workspace string) *MockLocker_GetQueueByLock_OngoingVerification { + params := []pegomock.Param{project, workspace} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "GetQueueByLock", params, verifier.timeout) + return &MockLocker_GetQueueByLock_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockLocker_GetQueueByLock_OngoingVerification struct { + mock *MockLocker + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockLocker_GetQueueByLock_OngoingVerification) GetCapturedArguments() (models.Project, string) { + project, workspace := c.GetAllCapturedArguments() + return project[len(project)-1], workspace[len(workspace)-1] +} + +func (c *MockLocker_GetQueueByLock_OngoingVerification) GetAllCapturedArguments() (_param0 []models.Project, _param1 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.Project, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.Project) + } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } + } + return +} + func (verifier *VerifierMockLocker) List() *MockLocker_List_OngoingVerification { params := []pegomock.Param{} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "List", params, verifier.timeout) @@ -241,8 +299,8 @@ func (c *MockLocker_TryLock_OngoingVerification) GetAllCapturedArguments() (_par return } -func (verifier *VerifierMockLocker) Unlock(key string) *MockLocker_Unlock_OngoingVerification { - params := []pegomock.Param{key} +func (verifier *VerifierMockLocker) Unlock(key string, updateQueue bool) *MockLocker_Unlock_OngoingVerification { + params := []pegomock.Param{key, updateQueue} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Unlock", params, verifier.timeout) return &MockLocker_Unlock_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -252,24 +310,28 @@ type MockLocker_Unlock_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockLocker_Unlock_OngoingVerification) GetCapturedArguments() string { - key := c.GetAllCapturedArguments() - return key[len(key)-1] +func (c *MockLocker_Unlock_OngoingVerification) GetCapturedArguments() (string, bool) { + key, updateQueue := c.GetAllCapturedArguments() + return key[len(key)-1], updateQueue[len(updateQueue)-1] } -func (c *MockLocker_Unlock_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { +func (c *MockLocker_Unlock_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(string) } + _param1 = make([]bool, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(bool) + } } return } -func (verifier *VerifierMockLocker) UnlockByPull(repoFullName string, pullNum int) *MockLocker_UnlockByPull_OngoingVerification { - params := []pegomock.Param{repoFullName, pullNum} +func (verifier *VerifierMockLocker) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) *MockLocker_UnlockByPull_OngoingVerification { + params := []pegomock.Param{repoFullName, pullNum, updateQueue} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "UnlockByPull", params, verifier.timeout) return &MockLocker_UnlockByPull_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -279,12 +341,12 @@ type MockLocker_UnlockByPull_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockLocker_UnlockByPull_OngoingVerification) GetCapturedArguments() (string, int) { - repoFullName, pullNum := c.GetAllCapturedArguments() - return repoFullName[len(repoFullName)-1], pullNum[len(pullNum)-1] +func (c *MockLocker_UnlockByPull_OngoingVerification) GetCapturedArguments() (string, int, bool) { + repoFullName, pullNum, updateQueue := c.GetAllCapturedArguments() + return repoFullName[len(repoFullName)-1], pullNum[len(pullNum)-1], updateQueue[len(updateQueue)-1] } -func (c *MockLocker_UnlockByPull_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []int) { +func (c *MockLocker_UnlockByPull_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []int, _param2 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) @@ -295,6 +357,10 @@ func (c *MockLocker_UnlockByPull_OngoingVerification) GetAllCapturedArguments() for u, param := range params[1] { _param1[u] = param.(int) } + _param2 = make([]bool, len(c.methodInvocations)) + for u, param := range params[2] { + _param2[u] = param.(bool) + } } return } diff --git a/server/core/redis/redis.go b/server/core/redis/redis.go index 6f7ee36a2a..d6ff9a1f25 100644 --- a/server/core/redis/redis.go +++ b/server/core/redis/redis.go @@ -19,14 +19,15 @@ var ctx = context.Background() // Redis is a database using Redis 6 type RedisDB struct { // nolint: revive - client *redis.Client + client *redis.Client + queueEnabled bool } const ( pullKeySeparator = "::" ) -func New(hostname string, port int, password string, tlsEnabled bool, insecureSkipVerify bool, db int) (*RedisDB, error) { +func New(hostname string, port int, password string, tlsEnabled bool, insecureSkipVerify bool, db int, queueEnabled bool) (*RedisDB, error) { var rdb *redis.Client var tlsConfig *tls.Config @@ -51,7 +52,8 @@ func New(hostname string, port int, password string, tlsEnabled bool, insecureSk } return &RedisDB{ - client: rdb, + client: rdb, + queueEnabled: queueEnabled, }, nil } @@ -66,49 +68,146 @@ func NewWithClient(client *redis.Client, bucket string, globalBucket string) (*R // acquired, it will return true and the lock returned will be newLock. // If the lock is not acquired, it will return false and the current // lock that is preventing this lock from being acquired. -func (r *RedisDB) TryLock(newLock models.ProjectLock) (bool, models.ProjectLock, error) { +func (r *RedisDB) TryLock(newLock models.ProjectLock) (bool, models.ProjectLock, *models.EnqueueStatus, error) { var currLock models.ProjectLock - key := r.lockKey(newLock.Project, newLock.Workspace) + lockKey := r.lockKey(newLock.Project, newLock.Workspace) newLockSerialized, _ := json.Marshal(newLock) + var enqueueStatus *models.EnqueueStatus - val, err := r.client.Get(ctx, key).Result() + val, err := r.client.Get(ctx, lockKey).Result() // if there is no run at that key then we're free to create the lock if err == redis.Nil { - err := r.client.Set(ctx, key, newLockSerialized, 0).Err() + err := r.client.Set(ctx, lockKey, newLockSerialized, 0).Err() if err != nil { - return false, currLock, errors.Wrap(err, "db transaction failed") + return false, currLock, enqueueStatus, errors.Wrap(err, "db transaction failed") } - return true, newLock, nil + return true, newLock, enqueueStatus, nil } else if err != nil { // otherwise the lock fails, return to caller the run that's holding the lock - return false, currLock, errors.Wrap(err, "db transaction failed") + return false, currLock, enqueueStatus, errors.Wrap(err, "db transaction failed") } else { if err := json.Unmarshal([]byte(val), &currLock); err != nil { - return false, currLock, errors.Wrap(err, "failed to deserialize current lock") + return false, currLock, enqueueStatus, errors.Wrap(err, "failed to deserialize current lock") + } + // checking if current lock is with the same PR or if queue is disabled + if currLock.Pull.Num == newLock.Pull.Num || !r.queueEnabled { + return false, currLock, enqueueStatus, nil + } else { + enqueueStatus, err = r.enqueue(newLock) + return false, currLock, enqueueStatus, err } - return false, currLock, nil } } +func (r *RedisDB) enqueue(newLock models.ProjectLock) (*models.EnqueueStatus, error) { + queueKey := r.queueKey(newLock.Project, newLock.Workspace) + currQueueSerialized, err := r.client.Get(ctx, queueKey).Result() + var queue models.ProjectLockQueue + if err == redis.Nil { + queue = models.ProjectLockQueue{} + } else if err != nil { + return nil, errors.Wrap(err, "db transaction failed") + } else { + if err := json.Unmarshal([]byte(currQueueSerialized), &queue); err != nil { + return nil, errors.Wrap(err, "failed to deserialize current queue") + } + } + // Lock is already in the queue + if indexInQueue := queue.FindPullRequest(newLock.Pull.Num); indexInQueue > -1 { + enqueueStatus := &models.EnqueueStatus{ + Status: models.AlreadyInTheQueue, + QueueDepth: indexInQueue + 1, + } + return enqueueStatus, nil + } + + // Not in the queue, add it + newQueue := append(queue, newLock) + newQueueSerialized, err := json.Marshal(newQueue) + if err != nil { + return nil, errors.Wrap(err, "serializing") + } + err = r.client.Set(ctx, queueKey, newQueueSerialized, 0).Err() + if err != nil { + return nil, errors.Wrap(err, "db transaction failed") + } + enqueueStatus := &models.EnqueueStatus{ + Status: models.Enqueued, + QueueDepth: len(newQueue), + } + return enqueueStatus, nil +} + // Unlock attempts to unlock the project and workspace. // If there is no lock, then it will return a nil pointer. // If there is a lock, then it will delete it, and then return a pointer -// to the deleted lock. -func (r *RedisDB) Unlock(project models.Project, workspace string) (*models.ProjectLock, error) { +// to the deleted lock. If updateQueue is true, it will also grant the +// lock to the next PR in the queue, update the queue and return the dequeued lock. +func (r *RedisDB) Unlock(project models.Project, workspace string, updateQueue bool) (*models.ProjectLock, *models.ProjectLock, error) { var lock models.ProjectLock - key := r.lockKey(project, workspace) + lockKey := r.lockKey(project, workspace) - val, err := r.client.Get(ctx, key).Result() + val, err := r.client.Get(ctx, lockKey).Result() + if err == redis.Nil { + return nil, nil, nil + } else if err != nil { + return nil, nil, errors.Wrap(err, "db transaction failed") + } else { + if err := json.Unmarshal([]byte(val), &lock); err != nil { + return nil, nil, errors.Wrap(err, "failed to deserialize current lock") + } + r.client.Del(ctx, lockKey) + // Dequeue next item + if r.queueEnabled && updateQueue { + dequeuedLock, err := r.dequeue(project, workspace, lockKey) + return &lock, dequeuedLock, err + } + return &lock, nil, nil + } +} + +func (r *RedisDB) dequeue(project models.Project, workspace string, lockKey string) (*models.ProjectLock, error) { + queueKey := r.queueKey(project, workspace) + currQueueSerialized, err := r.client.Get(ctx, queueKey).Result() if err == redis.Nil { return nil, nil } else if err != nil { return nil, errors.Wrap(err, "db transaction failed") } else { - if err := json.Unmarshal([]byte(val), &lock); err != nil { - return nil, errors.Wrap(err, "failed to deserialize current lock") + var currQueue models.ProjectLockQueue + if err := json.Unmarshal([]byte(currQueueSerialized), &currQueue); err != nil { + return nil, errors.Wrap(err, "failed to deserialize queue for current lock") } - r.client.Del(ctx, key) - return &lock, nil + + dequeuedLock, newQueue := currQueue.Dequeue() + + // A lock was dequeued - update current lock holder + if dequeuedLock != nil { + dequeuedLockSerialized, err := json.Marshal(*dequeuedLock) + if err != nil { + return dequeuedLock, errors.Wrap(err, "serializing") + } + err = r.client.Set(ctx, lockKey, dequeuedLockSerialized, 0).Err() + if err != nil { + return dequeuedLock, errors.Wrap(err, "db transaction failed") + } + } + + // New queue is empty and can be deleted + if len(newQueue) == 0 { + r.client.Del(ctx, queueKey) + return dequeuedLock, nil + } + + newQueueSerialized, err := json.Marshal(newQueue) + if err != nil { + return dequeuedLock, errors.Wrap(err, "serializing") + } + err = r.client.Set(ctx, queueKey, newQueueSerialized, 0).Err() + if err != nil { + return dequeuedLock, errors.Wrap(err, "db transaction failed") + } + return dequeuedLock, nil } } @@ -156,32 +255,66 @@ func (r *RedisDB) GetLock(project models.Project, workspace string) (*models.Pro } // UnlockByPull deletes all locks associated with that pull request and returns them. -func (r *RedisDB) UnlockByPull(repoFullName string, pullNum int) ([]models.ProjectLock, error) { +func (r *RedisDB) UnlockByPull(repoFullName string, pullNum int, updateQueue bool) ([]models.ProjectLock, *models.DequeueStatus, error) { var locks []models.ProjectLock + var dequeuedLocks = make([]models.ProjectLock, 0, len(locks)) iter := r.client.Scan(ctx, 0, fmt.Sprintf("pr/%s*", repoFullName), 0).Iterator() for iter.Next(ctx) { var lock models.ProjectLock val, err := r.client.Get(ctx, iter.Val()).Result() if err != nil { - return nil, errors.Wrap(err, "db transaction failed") + return nil, nil, errors.Wrap(err, "db transaction failed") } if err := json.Unmarshal([]byte(val), &lock); err != nil { - return locks, errors.Wrap(err, fmt.Sprintf("failed to deserialize lock at key '%s'", iter.Val())) + return locks, nil, errors.Wrap(err, fmt.Sprintf("failed to deserialize lock at key '%s'", iter.Val())) } if lock.Pull.Num == pullNum { locks = append(locks, lock) - if _, err := r.Unlock(lock.Project, lock.Workspace); err != nil { - return locks, errors.Wrapf(err, "unlocking repo %s, path %s, workspace %s", lock.Project.RepoFullName, lock.Project.Path, lock.Workspace) + _, dequeuedLock, err := r.Unlock(lock.Project, lock.Workspace, updateQueue) + if err != nil { + return locks, nil, errors.Wrapf(err, "unlocking repo %s, path %s, workspace %s", lock.Project.RepoFullName, lock.Project.Path, lock.Workspace) + } + if dequeuedLock != nil { + dequeuedLocks = append(dequeuedLocks, *dequeuedLock) } } } if err := iter.Err(); err != nil { - return locks, errors.Wrap(err, "db transaction failed") + return locks, &models.DequeueStatus{ProjectLocks: dequeuedLocks}, errors.Wrap(err, "db transaction failed") } - return locks, nil + return locks, &models.DequeueStatus{ProjectLocks: dequeuedLocks}, nil +} + +// GetQueueByLock returns the queue for a given lock. +// If queue is not enabled or if no such queue exists, it returns a nil pointer. +func (r *RedisDB) GetQueueByLock(project models.Project, workspace string) (models.ProjectLockQueue, error) { + if !r.queueEnabled { + return nil, nil + } + + key := r.queueKey(project, workspace) + + queueSerialized, err := r.client.Get(ctx, key).Result() + if err == redis.Nil { + // Queue not found + return nil, nil + } else if err != nil { + return nil, errors.Wrap(err, "db transaction failed") + } else { + // Queue is found, deserialize and return + var queue models.ProjectLockQueue + if err := json.Unmarshal([]byte(queueSerialized), &queue); err != nil { + return nil, errors.Wrapf(err, "deserializing queue at key %q", key) + } + for _, lock := range queue { + // need to set it to Local after deserialization due to https://github.com/golang/go/issues/19486 + lock.Time = lock.Time.Local() + } + return queue, nil + } } func (r *RedisDB) LockCommand(cmdName command.Name, lockTime time.Time) (*command.Lock, error) { @@ -394,6 +527,10 @@ func (r *RedisDB) lockKey(p models.Project, workspace string) string { return fmt.Sprintf("pr/%s/%s/%s", p.RepoFullName, p.Path, workspace) } +func (r *RedisDB) queueKey(p models.Project, workspace string) string { + return fmt.Sprintf("queue/%s/%s/%s", p.RepoFullName, p.Path, workspace) +} + func (r *RedisDB) commandLockKey(cmdName command.Name) string { return fmt.Sprintf("global/%s/lock", cmdName) } diff --git a/server/core/redis/redis_test.go b/server/core/redis/redis_test.go index 63bd1a0873..4d8cfb71a6 100644 --- a/server/core/redis/redis_test.go +++ b/server/core/redis/redis_test.go @@ -79,6 +79,88 @@ func TestRedisWithTLS(t *testing.T) { _ = newTestRedisTLS(s) } +func TestGetQueueByLock(t *testing.T) { + t.Log("Getting Queue By Lock") + s := miniredis.RunT(t) + r := newTestRedisQueue(s) + + // queue doesn't exist -> should return nil + queue, err := r.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Assert(t, queue == nil, "exp nil") + + _, _, _, err = r.TryLock(lock) + Ok(t, err) + + lock1 := lock + lock1.Pull.Num = 2 + _, _, _, err = r.TryLock(lock1) // this lock should be queued + Ok(t, err) + + lock2 := lock + lock2.Pull.Num = 3 + _, _, _, err = r.TryLock(lock2) // this lock should be queued + Ok(t, err) + + queue, _ = r.GetQueueByLock(lock.Project, lock.Workspace) + Equals(t, 2, len(queue)) +} + +func TestSingleQueue(t *testing.T) { + t.Log("locking should return correct EnqueueStatus for a single queue") + s := miniredis.RunT(t) + r := newTestRedisQueue(s) + + lockAcquired, _, _, err := r.TryLock(lock) + Ok(t, err) + Equals(t, true, lockAcquired) + + secondLock := lock + secondLock.Pull.Num = pullNum + 1 + lockAcquired, _, enqueueStatus, err := r.TryLock(secondLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, models.Enqueued, enqueueStatus.Status) + Equals(t, 1, enqueueStatus.QueueDepth) + + lockAcquired, _, enqueueStatus, err = r.TryLock(secondLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, models.AlreadyInTheQueue, enqueueStatus.Status) + Equals(t, 1, enqueueStatus.QueueDepth) + + thirdLock := lock + thirdLock.Pull.Num = pullNum + 2 + lockAcquired, _, enqueueStatus, err = r.TryLock(thirdLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, models.Enqueued, enqueueStatus.Status) + Equals(t, 2, enqueueStatus.QueueDepth) +} + +func TestMultipleQueues(t *testing.T) { + t.Log("locking should return correct EnqueueStatus for multiple queues") + s := miniredis.RunT(t) + r := newTestRedisQueue(s) + + lockAcquired, _, _, err := r.TryLock(lock) + Ok(t, err) + Equals(t, true, lockAcquired) + + lockInDifferentWorkspace := lock + lockInDifferentWorkspace.Workspace = "different-workspace" + lockAcquired, _, _, err = r.TryLock(lockInDifferentWorkspace) + Ok(t, err) + Equals(t, true, lockAcquired) + + secondLock := lock + secondLock.Pull.Num = pullNum + 1 + lockAcquired, _, enqueueStatus, err := r.TryLock(secondLock) + Ok(t, err) + Equals(t, false, lockAcquired) + Equals(t, 1, enqueueStatus.QueueDepth) +} + func TestLockCommandNotSet(t *testing.T) { t.Log("retrieving apply lock when there are none should return empty LockCommand") s := miniredis.RunT(t) @@ -148,7 +230,7 @@ func TestMixedLocksPresent(t *testing.T) { _, err := r.LockCommand(command.Apply, timeNow) Ok(t, err) - _, _, err = r.TryLock(lock) + _, _, _, err = r.TryLock(lock) Ok(t, err) ls, err := r.List() @@ -169,7 +251,7 @@ func TestListOneLock(t *testing.T) { t.Log("listing locks when there is one should return it") s := miniredis.RunT(t) r := newTestRedis(s) - _, _, err := r.TryLock(lock) + _, _, _, err := r.TryLock(lock) Ok(t, err) ls, err := r.List() Ok(t, err) @@ -192,7 +274,7 @@ func TestListMultipleLocks(t *testing.T) { for _, r := range repos { newLock := lock newLock.Project = models.NewProject(r, "path") - _, _, err := rdb.TryLock(newLock) + _, _, _, err := rdb.TryLock(newLock) Ok(t, err) } ls, err := rdb.List() @@ -213,9 +295,9 @@ func TestListAddRemove(t *testing.T) { t.Log("listing after adding and removing should return none") s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) - _, err = rdb.Unlock(project, workspace) + _, _, err = rdb.Unlock(project, workspace, true) Ok(t, err) ls, err := rdb.List() @@ -227,7 +309,7 @@ func TestLockingNoLocks(t *testing.T) { t.Log("with no locks yet, lock should succeed") s := miniredis.RunT(t) rdb := newTestRedis(s) - acquired, currLock, err := rdb.TryLock(lock) + acquired, currLock, _, err := rdb.TryLock(lock) Ok(t, err) Equals(t, true, acquired) Equals(t, lock, currLock) @@ -236,15 +318,15 @@ func TestLockingNoLocks(t *testing.T) { func TestLockingExistingLock(t *testing.T) { t.Log("if there is an existing lock, lock should...") s := miniredis.RunT(t) - rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + rdb := newTestRedisQueue(s) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) t.Log("...succeed if the new project has a different path") { newLock := lock newLock.Project = models.NewProject(project.RepoFullName, "different/path") - acquired, currLock, err := rdb.TryLock(newLock) + acquired, currLock, _, err := rdb.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, pullNum, currLock.Pull.Num) @@ -254,7 +336,7 @@ func TestLockingExistingLock(t *testing.T) { { newLock := lock newLock.Workspace = "different-workspace" - acquired, currLock, err := rdb.TryLock(newLock) + acquired, currLock, _, err := rdb.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, newLock, currLock) @@ -264,20 +346,20 @@ func TestLockingExistingLock(t *testing.T) { { newLock := lock newLock.Project = models.NewProject("different/repo", project.Path) - acquired, currLock, err := rdb.TryLock(newLock) + acquired, currLock, _, err := rdb.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, newLock, currLock) } - t.Log("...not succeed if the new project only has a different pullNum") + t.Log("...succeed if the new project has a different pullNum, the locking attempt will be queued") { newLock := lock newLock.Pull.Num = lock.Pull.Num + 1 - acquired, currLock, err := rdb.TryLock(newLock) + acquired, _, enqueueStatus, err := rdb.TryLock(newLock) Ok(t, err) Equals(t, false, acquired) - Equals(t, currLock.Pull.Num, pullNum) + Equals(t, 1, enqueueStatus.QueueDepth) } } @@ -285,7 +367,7 @@ func TestUnlockingNoLocks(t *testing.T) { t.Log("unlocking with no locks should succeed") s := miniredis.RunT(t) rdb := newTestRedis(s) - _, err := rdb.Unlock(project, workspace) + _, _, err := rdb.Unlock(project, workspace, true) Ok(t, err) } @@ -295,9 +377,9 @@ func TestUnlocking(t *testing.T) { s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) - _, err = rdb.Unlock(project, workspace) + _, _, err = rdb.Unlock(project, workspace, true) Ok(t, err) // should be no locks listed @@ -308,7 +390,7 @@ func TestUnlocking(t *testing.T) { // should be able to re-lock that repo with a new pull num newLock := lock newLock.Pull.Num = lock.Pull.Num + 1 - acquired, currLock, err := rdb.TryLock(newLock) + acquired, currLock, _, err := rdb.TryLock(newLock) Ok(t, err) Equals(t, true, acquired) Equals(t, newLock, currLock) @@ -319,32 +401,32 @@ func TestUnlockingMultiple(t *testing.T) { s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) new := lock new.Project.RepoFullName = "new/repo" - _, _, err = rdb.TryLock(new) + _, _, _, err = rdb.TryLock(new) Ok(t, err) new2 := lock new2.Project.Path = "new/path" - _, _, err = rdb.TryLock(new2) + _, _, _, err = rdb.TryLock(new2) Ok(t, err) new3 := lock new3.Workspace = "new-workspace" - _, _, err = rdb.TryLock(new3) + _, _, _, err = rdb.TryLock(new3) Ok(t, err) // now try and unlock them - _, err = rdb.Unlock(new3.Project, new3.Workspace) + _, _, err = rdb.Unlock(new3.Project, new3.Workspace, true) Ok(t, err) - _, err = rdb.Unlock(new2.Project, workspace) + _, _, err = rdb.Unlock(new2.Project, workspace, true) Ok(t, err) - _, err = rdb.Unlock(new.Project, workspace) + _, _, err = rdb.Unlock(new.Project, workspace, true) Ok(t, err) - _, err = rdb.Unlock(project, workspace) + _, _, err = rdb.Unlock(project, workspace, true) Ok(t, err) // should be none left @@ -358,7 +440,7 @@ func TestUnlockByPullNone(t *testing.T) { s := miniredis.RunT(t) rdb := newTestRedis(s) - _, err := rdb.UnlockByPull("any/repo", 1) + _, _, err := rdb.UnlockByPull("any/repo", 1, true) Ok(t, err) } @@ -366,12 +448,12 @@ func TestUnlockByPullOne(t *testing.T) { t.Log("with one lock, UnlockByPull should...") s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) t.Log("...delete nothing when its the same repo but a different pull") { - _, err := rdb.UnlockByPull(project.RepoFullName, pullNum+1) + _, _, err := rdb.UnlockByPull(project.RepoFullName, pullNum+1, true) Ok(t, err) ls, err := rdb.List() Ok(t, err) @@ -379,7 +461,7 @@ func TestUnlockByPullOne(t *testing.T) { } t.Log("...delete nothing when its the same pull but a different repo") { - _, err := rdb.UnlockByPull("different/repo", pullNum) + _, _, err := rdb.UnlockByPull("different/repo", pullNum, true) Ok(t, err) ls, err := rdb.List() Ok(t, err) @@ -387,7 +469,7 @@ func TestUnlockByPullOne(t *testing.T) { } t.Log("...delete the lock when its the same repo and pull") { - _, err := rdb.UnlockByPull(project.RepoFullName, pullNum) + _, _, err := rdb.UnlockByPull(project.RepoFullName, pullNum, true) Ok(t, err) ls, err := rdb.List() Ok(t, err) @@ -399,12 +481,12 @@ func TestUnlockByPullAfterUnlock(t *testing.T) { t.Log("after locking and unlocking, UnlockByPull should be successful") s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) - _, err = rdb.Unlock(project, workspace) + _, _, err = rdb.Unlock(project, workspace, true) Ok(t, err) - _, err = rdb.UnlockByPull(project.RepoFullName, pullNum) + _, _, err = rdb.UnlockByPull(project.RepoFullName, pullNum, true) Ok(t, err) ls, err := rdb.List() Ok(t, err) @@ -415,17 +497,17 @@ func TestUnlockByPullMatching(t *testing.T) { t.Log("UnlockByPull should delete all locks in that repo and pull num") s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) // add additional locks with the same repo and pull num but different paths/workspaces new := lock new.Project.Path = "dif/path" - _, _, err = rdb.TryLock(new) + _, _, _, err = rdb.TryLock(new) Ok(t, err) new2 := lock new2.Workspace = "new-workspace" - _, _, err = rdb.TryLock(new2) + _, _, _, err = rdb.TryLock(new2) Ok(t, err) // there should now be 3 @@ -434,13 +516,112 @@ func TestUnlockByPullMatching(t *testing.T) { Equals(t, 3, len(ls)) // should all be unlocked - _, err = rdb.UnlockByPull(project.RepoFullName, pullNum) + _, _, err = rdb.UnlockByPull(project.RepoFullName, pullNum, true) Ok(t, err) ls, err = rdb.List() Ok(t, err) Equals(t, 0, len(ls)) } +func TestDequeueAfterUnlock(t *testing.T) { + t.Log("unlocking should dequeue and grant lock to the next ProjectLock") + s := miniredis.RunT(t) + r := newTestRedisQueue(s) + + // first lock acquired + _, _, _, err := r.TryLock(lock) + Ok(t, err) + + // second lock enqueued + new := lock + new.Pull.Num = pullNum + 1 + _, _, _, err = r.TryLock(new) + Ok(t, err) + + // third lock enqueued + new2 := lock + new2.Pull.Num = pullNum + 2 + _, _, _, err = r.TryLock(new2) + Ok(t, err) + queue, err := r.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Equals(t, 2, len(queue)) + Equals(t, new.Pull, queue[0].Pull) + Equals(t, new2.Pull, queue[1].Pull) + + // first lock unlocked -> second lock dequeued and lock acquired + _, dequeuedLock, err := r.Unlock(lock.Project, lock.Workspace, true) + Ok(t, err) + queue, err = r.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Equals(t, new, *dequeuedLock) + Equals(t, 1, len(queue)) + Equals(t, new2.Pull, queue[0].Pull) + + // second lock unlocked without touching the queue + _, dequeuedLock, err = r.Unlock(new.Project, new.Workspace, false) + Ok(t, err) + Assert(t, dequeuedLock == nil, "exp nil") + queue, err = r.GetQueueByLock(lock.Project, lock.Workspace) + Ok(t, err) + Equals(t, 1, len(queue)) + Equals(t, new2.Pull, queue[0].Pull) + + l, err := r.GetLock(project, workspace) + Ok(t, err) + Assert(t, l == nil, "exp nil") + + // bring the second lock again + _, _, _, err = r.TryLock(new) + Ok(t, err) + + // second lock unlocked -> third lock dequeued and lock acquired + _, dequeuedLock, err = r.Unlock(new.Project, new.Workspace, true) + Ok(t, err) + Equals(t, new2, *dequeuedLock) + + // Queue is deleted when empty + queue, err = r.GetQueueByLock(new2.Project, new2.Workspace) + Ok(t, err) + Assert(t, queue == nil, "exp nil") + + // third lock unlocked -> no more locks in the queue + _, dequeuedLock, err = r.Unlock(new2.Project, new2.Workspace, true) + Ok(t, err) + Equals(t, (*models.ProjectLock)(nil), dequeuedLock) + +} + +func TestDequeueAfterUnlockByPull(t *testing.T) { + t.Log("unlocking by pull should dequeue and grant lock to all dequeued ProjectLocks") + s := miniredis.RunT(t) + r := newTestRedisQueue(s) + + _, _, _, err := r.TryLock(lock) + Ok(t, err) + + lock2 := lock + lock2.Workspace = "different-workspace" + _, _, _, err = r.TryLock(lock2) + Ok(t, err) + + lock3 := lock + lock3.Pull.Num = pullNum + 1 + _, _, _, err = r.TryLock(lock3) + Ok(t, err) + + lock4 := lock + lock4.Workspace = "different-workspace" + lock4.Pull.Num = pullNum + 1 + _, _, _, err = r.TryLock(lock4) + Ok(t, err) + + _, dequeueStatus, err := r.UnlockByPull(project.RepoFullName, pullNum, true) + Ok(t, err) + + Equals(t, 2, len(dequeueStatus.ProjectLocks)) +} + func TestGetLockNotThere(t *testing.T) { t.Log("getting a lock that doesn't exist should return a nil pointer") s := miniredis.RunT(t) @@ -454,7 +635,7 @@ func TestGetLock(t *testing.T) { t.Log("getting a lock should return the lock") s := miniredis.RunT(t) rdb := newTestRedis(s) - _, _, err := rdb.TryLock(lock) + _, _, _, err := rdb.TryLock(lock) Ok(t, err) l, err := rdb.GetLock(project, workspace) @@ -918,7 +1099,7 @@ func TestPullStatus_UpdateMerge_ApprovePolicies(t *testing.T) { } func newTestRedis(mr *miniredis.Miniredis) *redis.RedisDB { - r, err := redis.New(mr.Host(), mr.Server().Addr().Port, "", false, false, 0) + r, err := redis.New(mr.Host(), mr.Server().Addr().Port, "", false, false, 0, false) if err != nil { panic(errors.Wrap(err, "failed to create test redis client")) } @@ -926,7 +1107,15 @@ func newTestRedis(mr *miniredis.Miniredis) *redis.RedisDB { } func newTestRedisTLS(mr *miniredis.Miniredis) *redis.RedisDB { - r, err := redis.New(mr.Host(), mr.Server().Addr().Port, "", true, true, 0) + r, err := redis.New(mr.Host(), mr.Server().Addr().Port, "", true, true, 0, false) + if err != nil { + panic(errors.Wrap(err, "failed to create test redis client")) + } + return r +} + +func newTestRedisQueue(mr *miniredis.Miniredis) *redis.RedisDB { + r, err := redis.New(mr.Host(), mr.Server().Addr().Port, "", false, false, 0, true) if err != nil { panic(errors.Wrap(err, "failed to create test redis client")) } diff --git a/server/events/apply_command_runner_test.go b/server/events/apply_command_runner_test.go index 27117008a6..a2c1084ac7 100644 --- a/server/events/apply_command_runner_test.go +++ b/server/events/apply_command_runner_test.go @@ -138,7 +138,7 @@ func TestApplyCommandRunner_IsSilenced(t *testing.T) { t.Run(c.Description, func(t *testing.T) { // create an empty DB tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) vcsClient := setup(t, func(tc *TestConfig) { diff --git a/server/events/command_runner_test.go b/server/events/command_runner_test.go index 3b652d73db..e2179a0296 100644 --- a/server/events/command_runner_test.go +++ b/server/events/command_runner_test.go @@ -84,7 +84,7 @@ func setup(t *testing.T, options ...func(testConfig *TestConfig)) *vcsmocks.Mock // create an empty DB tmp := t.TempDir() - defaultBoltDB, err := db.New(tmp) + defaultBoltDB, err := db.New(tmp, false) Ok(t, err) testConfig := &TestConfig{ @@ -612,7 +612,7 @@ func TestRunUnlockCommandFail_VCSComment(t *testing.T) { modelPull := models.PullRequest{BaseRepo: testdata.GithubRepo, State: models.OpenPullState, Num: testdata.Pull.Num} When(githubGetter.GetPullRequest(testdata.GithubRepo, testdata.Pull.Num)).ThenReturn(pull, nil) When(eventParsing.ParseGithubPull(pull)).ThenReturn(modelPull, modelPull.BaseRepo, testdata.GithubRepo, nil) - When(deleteLockCommand.DeleteLocksByPull(testdata.GithubRepo.FullName, testdata.Pull.Num)).ThenReturn(0, errors.New("err")) + When(deleteLockCommand.DeleteLocksByPull(testdata.GithubRepo.FullName, testdata.Pull.Num)).ThenReturn(0, nil, errors.New("err")) ch.RunCommentCommand(testdata.GithubRepo, &testdata.GithubRepo, nil, testdata.User, testdata.Pull.Num, &events.CommentCommand{Name: command.Unlock}) @@ -622,7 +622,7 @@ func TestRunUnlockCommandFail_VCSComment(t *testing.T) { func TestRunAutoplanCommand_DeletePlans(t *testing.T) { setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -643,13 +643,13 @@ func TestRunAutoplanCommand_DeletePlans(t *testing.T) { testdata.Pull.BaseRepo = testdata.GithubRepo ch.RunAutoplanCommand(testdata.GithubRepo, testdata.GithubRepo, testdata.Pull, testdata.User) pendingPlanFinder.VerifyWasCalledOnce().DeletePlans(tmp) - lockingLocker.VerifyWasCalledOnce().UnlockByPull(testdata.Pull.BaseRepo.FullName, testdata.Pull.Num) + lockingLocker.VerifyWasCalledOnce().UnlockByPull(testdata.Pull.BaseRepo.FullName, testdata.Pull.Num, true) } func TestRunGenericPlanCommand_DeletePlans(t *testing.T) { setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -665,13 +665,13 @@ func TestRunGenericPlanCommand_DeletePlans(t *testing.T) { testdata.Pull.BaseRepo = testdata.GithubRepo ch.RunCommentCommand(testdata.GithubRepo, nil, nil, testdata.User, testdata.Pull.Num, &events.CommentCommand{Name: command.Plan}) pendingPlanFinder.VerifyWasCalledOnce().DeletePlans(tmp) - lockingLocker.VerifyWasCalledOnce().UnlockByPull(testdata.Pull.BaseRepo.FullName, testdata.Pull.Num) + lockingLocker.VerifyWasCalledOnce().UnlockByPull(testdata.Pull.BaseRepo.FullName, testdata.Pull.Num, false) } func TestRunSpecificPlanCommandDoesnt_DeletePlans(t *testing.T) { setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -691,7 +691,7 @@ func TestRunAutoplanCommandWithError_DeletePlans(t *testing.T) { vcsClient := setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -743,7 +743,7 @@ func TestRunGenericPlanCommand_DiscardApprovals(t *testing.T) { }) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -759,7 +759,7 @@ func TestRunGenericPlanCommand_DiscardApprovals(t *testing.T) { testdata.Pull.BaseRepo = testdata.GithubRepo ch.RunCommentCommand(testdata.GithubRepo, nil, nil, testdata.User, testdata.Pull.Num, &events.CommentCommand{Name: command.Plan}) pendingPlanFinder.VerifyWasCalledOnce().DeletePlans(tmp) - lockingLocker.VerifyWasCalledOnce().UnlockByPull(testdata.Pull.BaseRepo.FullName, testdata.Pull.Num) + lockingLocker.VerifyWasCalledOnce().UnlockByPull(testdata.Pull.BaseRepo.FullName, testdata.Pull.Num, false) vcsClient.VerifyWasCalledOnce().DiscardReviews(Any[models.Repo](), Any[models.PullRequest]()) } @@ -768,7 +768,7 @@ func TestFailedApprovalCreatesFailedStatusUpdate(t *testing.T) { t.Log("if \"atlantis approve_policies\" is run by non policy owner policy check status fails.") setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -813,7 +813,7 @@ func TestApprovedPoliciesUpdateFailedPolicyStatus(t *testing.T) { t.Log("if \"atlantis approve_policies\" is run by policy owner all policy checks are approved.") setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -868,7 +868,7 @@ func TestApplyMergeablityWhenPolicyCheckFails(t *testing.T) { t.Log("if \"atlantis apply\" is run with failing policy check then apply is not performed") setup(t) tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB @@ -946,7 +946,7 @@ func TestRunApply_DiscardedProjects(t *testing.T) { autoMerger.GlobalAutomerge = true defer func() { autoMerger.GlobalAutomerge = false }() tmp := t.TempDir() - boltDB, err := db.New(tmp) + boltDB, err := db.New(tmp, false) Ok(t, err) dbUpdater.Backend = boltDB applyCommandRunner.Backend = boltDB diff --git a/server/events/delete_lock_command.go b/server/events/delete_lock_command.go index ccd7f5d146..b71814fe1b 100644 --- a/server/events/delete_lock_command.go +++ b/server/events/delete_lock_command.go @@ -10,8 +10,8 @@ import ( // DeleteLockCommand is the first step after a command request has been parsed. type DeleteLockCommand interface { - DeleteLock(id string) (*models.ProjectLock, error) - DeleteLocksByPull(repoFullName string, pullNum int) (int, error) + DeleteLock(id string) (*models.ProjectLock, *models.ProjectLock, error) + DeleteLocksByPull(repoFullName string, pullNum int) (int, *models.DequeueStatus, error) } // DefaultDeleteLockCommand deletes a specific lock after a request from the LocksController. @@ -24,13 +24,15 @@ type DefaultDeleteLockCommand struct { } // DeleteLock handles deleting the lock at id -func (l *DefaultDeleteLockCommand) DeleteLock(id string) (*models.ProjectLock, error) { - lock, err := l.Locker.Unlock(id) +func (l *DefaultDeleteLockCommand) DeleteLock(id string) (*models.ProjectLock, *models.ProjectLock, error) { + // TODO(Ghais) extend the tests + // TODO(Ghais) #9 When the lock(s) has(ve) been explicitly removed, also dequeue the next PR(s) + lock, dequeuedLock, err := l.Locker.Unlock(id, true) if err != nil { - return nil, err + return nil, nil, err } if lock == nil { - return nil, nil + return nil, nil, nil } // The locks controller currently has no implementation of Atlantis project names, so this is hardcoded to an empty string. @@ -39,22 +41,22 @@ func (l *DefaultDeleteLockCommand) DeleteLock(id string) (*models.ProjectLock, e removeErr := l.WorkingDir.DeletePlan(lock.Pull.BaseRepo, lock.Pull, lock.Workspace, lock.Project.Path, projectName) if removeErr != nil { l.Logger.Warn("Failed to delete plan: %s", removeErr) - return nil, removeErr + return nil, nil, removeErr } - return lock, nil + return lock, dequeuedLock, nil } // DeleteLocksByPull handles deleting all locks for the pull request -func (l *DefaultDeleteLockCommand) DeleteLocksByPull(repoFullName string, pullNum int) (int, error) { - locks, err := l.Locker.UnlockByPull(repoFullName, pullNum) +func (l *DefaultDeleteLockCommand) DeleteLocksByPull(repoFullName string, pullNum int) (int, *models.DequeueStatus, error) { + locks, dequeueStatus, err := l.Locker.UnlockByPull(repoFullName, pullNum, true) numLocks := len(locks) if err != nil { - return numLocks, err + return numLocks, dequeueStatus, err } if numLocks == 0 { l.Logger.Debug("No locks found for pull") - return numLocks, nil + return numLocks, dequeueStatus, nil } for i := 0; i < numLocks; i++ { @@ -62,7 +64,7 @@ func (l *DefaultDeleteLockCommand) DeleteLocksByPull(repoFullName string, pullNu l.deleteWorkingDir(lock) } - return numLocks, nil + return numLocks, dequeueStatus, nil } func (l *DefaultDeleteLockCommand) deleteWorkingDir(lock models.ProjectLock) { diff --git a/server/events/delete_lock_command_test.go b/server/events/delete_lock_command_test.go index ade6e64362..af6660c86d 100644 --- a/server/events/delete_lock_command_test.go +++ b/server/events/delete_lock_command_test.go @@ -17,12 +17,12 @@ func TestDeleteLock_LockerErr(t *testing.T) { t.Log("If there is an error retrieving the lock, we return the error") RegisterMockTestingT(t) l := lockmocks.NewMockLocker() - When(l.Unlock("id")).ThenReturn(nil, errors.New("err")) + When(l.Unlock("id", true)).ThenReturn(nil, nil, errors.New("err")) dlc := events.DefaultDeleteLockCommand{ Locker: l, Logger: logging.NewNoopLogger(t), } - _, err := dlc.DeleteLock("id") + _, _, err := dlc.DeleteLock("id") ErrEquals(t, "err", err) } @@ -30,12 +30,12 @@ func TestDeleteLock_None(t *testing.T) { t.Log("If there is no lock at that ID we return nil") RegisterMockTestingT(t) l := lockmocks.NewMockLocker() - When(l.Unlock("id")).ThenReturn(nil, nil) + When(l.Unlock("id", true)).ThenReturn(nil, nil, nil) dlc := events.DefaultDeleteLockCommand{ Locker: l, Logger: logging.NewNoopLogger(t), } - lock, err := dlc.DeleteLock("id") + lock, _, err := dlc.DeleteLock("id") Ok(t, err) Assert(t, lock == nil, "lock was not nil") } @@ -44,7 +44,7 @@ func TestDeleteLock_Success(t *testing.T) { t.Log("Delete lock deletes successfully the plan file") RegisterMockTestingT(t) l := lockmocks.NewMockLocker() - When(l.Unlock("id")).ThenReturn(&models.ProjectLock{}, nil) + When(l.Unlock("id", true)).ThenReturn(&models.ProjectLock{}, &models.ProjectLock{}, nil) workingDir := events.NewMockWorkingDir() workingDirLocker := events.NewDefaultWorkingDirLocker() workspace := "workspace" @@ -53,16 +53,16 @@ func TestDeleteLock_Success(t *testing.T) { pull := models.PullRequest{ BaseRepo: models.Repo{FullName: "owner/repo"}, } - When(l.Unlock("id")).ThenReturn(&models.ProjectLock{ + When(l.Unlock("id", true)).ThenReturn(&models.ProjectLock{ Pull: pull, Workspace: workspace, Project: models.Project{ Path: path, RepoFullName: pull.BaseRepo.FullName, }, - }, nil) + }, &models.ProjectLock{}, nil) tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) dlc := events.DefaultDeleteLockCommand{ Locker: l, @@ -71,7 +71,7 @@ func TestDeleteLock_Success(t *testing.T) { WorkingDirLocker: workingDirLocker, WorkingDir: workingDir, } - lock, err := dlc.DeleteLock("id") + lock, _, err := dlc.DeleteLock("id") Ok(t, err) Assert(t, lock != nil, "lock was nil") workingDir.VerifyWasCalledOnce().DeletePlan(pull.BaseRepo, pull, workspace, path, projectName) @@ -84,13 +84,13 @@ func TestDeleteLocksByPull_LockerErr(t *testing.T) { RegisterMockTestingT(t) l := lockmocks.NewMockLocker() workingDir := events.NewMockWorkingDir() - When(l.UnlockByPull(repoName, pullNum)).ThenReturn(nil, errors.New("err")) + When(l.UnlockByPull(repoName, pullNum, true)).ThenReturn(nil, nil, errors.New("err")) dlc := events.DefaultDeleteLockCommand{ Locker: l, Logger: logging.NewNoopLogger(t), WorkingDir: workingDir, } - _, err := dlc.DeleteLocksByPull(repoName, pullNum) + _, _, err := dlc.DeleteLocksByPull(repoName, pullNum) ErrEquals(t, "err", err) workingDir.VerifyWasCalled(Never()).DeletePlan(Any[models.Repo](), Any[models.PullRequest](), Any[string](), Any[string](), Any[string]()) } @@ -102,13 +102,13 @@ func TestDeleteLocksByPull_None(t *testing.T) { RegisterMockTestingT(t) l := lockmocks.NewMockLocker() workingDir := events.NewMockWorkingDir() - When(l.UnlockByPull(repoName, pullNum)).ThenReturn([]models.ProjectLock{}, nil) + When(l.UnlockByPull(repoName, pullNum, true)).ThenReturn([]models.ProjectLock{}, nil, nil) dlc := events.DefaultDeleteLockCommand{ Locker: l, Logger: logging.NewNoopLogger(t), WorkingDir: workingDir, } - _, err := dlc.DeleteLocksByPull(repoName, pullNum) + _, _, err := dlc.DeleteLocksByPull(repoName, pullNum) Ok(t, err) workingDir.VerifyWasCalled(Never()).DeletePlan(Any[models.Repo](), Any[models.PullRequest](), Any[string](), Any[string](), Any[string]()) } @@ -119,11 +119,11 @@ func TestDeleteLocksByPull_OldFormat(t *testing.T) { pullNum := 2 RegisterMockTestingT(t) l := lockmocks.NewMockLocker() - When(l.UnlockByPull(repoName, pullNum)).ThenReturn([]models.ProjectLock{{}}, nil) + When(l.UnlockByPull(repoName, pullNum, true)).ThenReturn([]models.ProjectLock{{}}, nil, nil) dlc := events.DefaultDeleteLockCommand{ Locker: l, Logger: logging.NewNoopLogger(t), } - _, err := dlc.DeleteLocksByPull(repoName, pullNum) + _, _, err := dlc.DeleteLocksByPull(repoName, pullNum) Ok(t, err) } diff --git a/server/events/mocks/mock_delete_lock_command.go b/server/events/mocks/mock_delete_lock_command.go index ce1afd3b72..f5068444f2 100644 --- a/server/events/mocks/mock_delete_lock_command.go +++ b/server/events/mocks/mock_delete_lock_command.go @@ -25,42 +25,50 @@ func NewMockDeleteLockCommand(options ...pegomock.Option) *MockDeleteLockCommand func (mock *MockDeleteLockCommand) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockDeleteLockCommand) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockDeleteLockCommand) DeleteLock(id string) (*models.ProjectLock, error) { +func (mock *MockDeleteLockCommand) DeleteLock(id string) (*models.ProjectLock, *models.ProjectLock, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockDeleteLockCommand().") } params := []pegomock.Param{id} - result := pegomock.GetGenericMockFrom(mock).Invoke("DeleteLock", params, []reflect.Type{reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + result := pegomock.GetGenericMockFrom(mock).Invoke("DeleteLock", params, []reflect.Type{reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((**models.ProjectLock)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 *models.ProjectLock - var ret1 error + var ret1 *models.ProjectLock + var ret2 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].(*models.ProjectLock) } if result[1] != nil { - ret1 = result[1].(error) + ret1 = result[1].(*models.ProjectLock) + } + if result[2] != nil { + ret2 = result[2].(error) } } - return ret0, ret1 + return ret0, ret1, ret2 } -func (mock *MockDeleteLockCommand) DeleteLocksByPull(repoFullName string, pullNum int) (int, error) { +func (mock *MockDeleteLockCommand) DeleteLocksByPull(repoFullName string, pullNum int) (int, *models.DequeueStatus, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockDeleteLockCommand().") } params := []pegomock.Param{repoFullName, pullNum} - result := pegomock.GetGenericMockFrom(mock).Invoke("DeleteLocksByPull", params, []reflect.Type{reflect.TypeOf((*int)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + result := pegomock.GetGenericMockFrom(mock).Invoke("DeleteLocksByPull", params, []reflect.Type{reflect.TypeOf((*int)(nil)).Elem(), reflect.TypeOf((**models.DequeueStatus)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 int - var ret1 error + var ret1 *models.DequeueStatus + var ret2 error if len(result) != 0 { if result[0] != nil { ret0 = result[0].(int) } if result[1] != nil { - ret1 = result[1].(error) + ret1 = result[1].(*models.DequeueStatus) + } + if result[2] != nil { + ret2 = result[2].(error) } } - return ret0, ret1 + return ret0, ret1, ret2 } func (mock *MockDeleteLockCommand) VerifyWasCalledOnce() *VerifierMockDeleteLockCommand { diff --git a/server/events/models/models.go b/server/events/models/models.go index 492fc36fea..23e43fad21 100644 --- a/server/events/models/models.go +++ b/server/events/models/models.go @@ -660,3 +660,47 @@ func NewPlanSuccessStats(output string) PlanSuccessStats { return s } + +type ProjectLockQueue []ProjectLock + +func (q ProjectLockQueue) FindPullRequest(pullRequestNumber int) int { + for i := range q { + if q[i].Pull.Num == pullRequestNumber { + return i + } + } + return -1 +} + +// Dequeue dequeues the next item and returns the dequeued lock and the new queue. +// if the queue is empty, returns nil and the current queue +func (q ProjectLockQueue) Dequeue() (*ProjectLock, ProjectLockQueue) { + if len(q) == 0 { + return nil, q + } + dequeuedLock := &q[0] + newQueue := q[1:] + return dequeuedLock, newQueue +} + +type EnqueueStatusType int + +const ( + // Enqueued means the ProjectLock entered the queue + Enqueued EnqueueStatusType = iota + + // AlreadyInTheQueue means the ProjectLock is already in the queue + AlreadyInTheQueue +) + +type EnqueueStatus struct { + // Status is the status of the enqueue operation + Status EnqueueStatusType + // QueueDepth tells how many PRs are in line before the current one + QueueDepth int +} + +type DequeueStatus struct { + // the PR's lock that should be planned next + ProjectLocks []ProjectLock +} diff --git a/server/events/models/models_test.go b/server/events/models/models_test.go index 126d89c60a..c69381c0b1 100644 --- a/server/events/models/models_test.go +++ b/server/events/models/models_test.go @@ -706,3 +706,42 @@ func TestPlanSuccessStats(t *testing.T) { }) } } + +func TestProjectQueue_FindPullRequest(t *testing.T) { + queue := models.ProjectLockQueue{ + {Pull: models.PullRequest{Num: 15}}, + {Pull: models.PullRequest{Num: 16}}, + {Pull: models.PullRequest{Num: 17}}, + } + Equals(t, 0, queue.FindPullRequest(15)) + Equals(t, 1, queue.FindPullRequest(16)) + Equals(t, 2, queue.FindPullRequest(17)) + Equals(t, -1, queue.FindPullRequest(20)) + + emptyQueue := models.ProjectLockQueue{} + Equals(t, -1, emptyQueue.FindPullRequest(15)) +} + +func TestProjectQueue_Dequeue(t *testing.T) { + queue := models.ProjectLockQueue{ + {Pull: models.PullRequest{Num: 15}}, + {Pull: models.PullRequest{Num: 16}}, + {Pull: models.PullRequest{Num: 17}}, + } + + dequeuedLock, newQueue := queue.Dequeue() + Equals(t, 15, dequeuedLock.Pull.Num) + Equals(t, 2, len(newQueue)) + + dequeuedLock, newQueue = newQueue.Dequeue() + Equals(t, 16, dequeuedLock.Pull.Num) + Equals(t, 1, len(newQueue)) + + dequeuedLock, newQueue = newQueue.Dequeue() + Equals(t, 17, dequeuedLock.Pull.Num) + Equals(t, 0, len(newQueue)) + + dequeuedLock, newQueue = newQueue.Dequeue() + Assert(t, dequeuedLock == nil, "dequeued lock was not nil") + Equals(t, 0, len(newQueue)) +} diff --git a/server/events/plan_command_runner.go b/server/events/plan_command_runner.go index 9313f14d4c..4c4ffad80d 100644 --- a/server/events/plan_command_runner.go +++ b/server/events/plan_command_runner.go @@ -121,7 +121,7 @@ func (p *PlanCommandRunner) runAutoplan(ctx *command.Context) { // discard previous plans that might not be relevant anymore ctx.Log.Debug("deleting previous plans and locks") p.deletePlans(ctx) - _, err = p.lockingLocker.UnlockByPull(baseRepo.FullName, pull.Num) + _, _, err = p.lockingLocker.UnlockByPull(baseRepo.FullName, pull.Num, true) if err != nil { ctx.Log.Err("deleting locks: %s", err) } @@ -241,7 +241,7 @@ func (p *PlanCommandRunner) run(ctx *command.Context, cmd *CommentCommand) { if !cmd.IsForSpecificProject() { ctx.Log.Debug("deleting previous plans and locks") p.deletePlans(ctx) - _, err = p.lockingLocker.UnlockByPull(baseRepo.FullName, pull.Num) + _, _, err = p.lockingLocker.UnlockByPull(baseRepo.FullName, pull.Num, false) if err != nil { ctx.Log.Err("deleting locks: %s", err) } diff --git a/server/events/plan_command_runner_test.go b/server/events/plan_command_runner_test.go index 80ed066d7f..7229cfa3db 100644 --- a/server/events/plan_command_runner_test.go +++ b/server/events/plan_command_runner_test.go @@ -77,7 +77,7 @@ func TestPlanCommandRunner_IsSilenced(t *testing.T) { t.Run(c.Description, func(t *testing.T) { // create an empty DB tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) vcsClient := setup(t, func(tc *TestConfig) { @@ -459,7 +459,7 @@ func TestPlanCommandRunner_ExecutionOrder(t *testing.T) { // vcsClient := setup(t) tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) vcsClient := setup(t, func(tc *TestConfig) { @@ -699,7 +699,7 @@ func TestPlanCommandRunner_AtlantisApplyStatus(t *testing.T) { t.Run(c.Description, func(t *testing.T) { // create an empty DB tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) vcsClient := setup(t, func(tc *TestConfig) { diff --git a/server/events/project_locker.go b/server/events/project_locker.go index 275b9e8c72..4a24867ebf 100644 --- a/server/events/project_locker.go +++ b/server/events/project_locker.go @@ -74,10 +74,15 @@ func (p *DefaultProjectLocker) TryLock(log logging.SimpleLogging, pull models.Pu if err != nil { return nil, err } - failureMsg := fmt.Sprintf( - "This project is currently locked by an unapplied plan from pull %s. To continue, delete the lock from %s or apply that plan and merge the pull request.\n\nOnce the lock is released, comment `atlantis plan` here to re-plan.", - link, - link) + var failureMsg string + if lockAttempt.EnqueueStatus == nil { + failureMsg = fmt.Sprintf( + "This project is currently locked by an unapplied plan from pull %s. To continue, delete the lock from %s or apply that plan and merge the pull request.\n\nOnce the lock is released, comment `atlantis plan` here to re-plan.", + link, + link) + } else { + failureMsg = generateMessageWithQueueStatus(link, lockAttempt) + } return &TryLockResponse{ LockAcquired: false, LockFailureReason: failureMsg, @@ -87,9 +92,22 @@ func (p *DefaultProjectLocker) TryLock(log logging.SimpleLogging, pull models.Pu return &TryLockResponse{ LockAcquired: true, UnlockFn: func() error { - _, err := p.Locker.Unlock(lockAttempt.LockKey) + // TODO(Ghais) this will be called if there was a plan error and the lock was automatically dropped; + // Should we assure dequeuing of the next PR here too? + _, _, err := p.Locker.Unlock(lockAttempt.LockKey, false) return err }, LockKey: lockAttempt.LockKey, }, nil } + +func generateMessageWithQueueStatus(link string, lockAttempt locking.TryLockResponse) string { + failureMsg := fmt.Sprintf("This project is currently locked by an unapplied plan from pull %s.", link) + switch lockAttempt.EnqueueStatus.Status { + case models.Enqueued: + failureMsg = fmt.Sprintf("%s This PR entered the waiting queue :clock130:, number of PRs ahead of you: **%d**.", failureMsg, lockAttempt.EnqueueStatus.QueueDepth) + case models.AlreadyInTheQueue: + failureMsg = fmt.Sprintf("%s This PR is already in the queue :clock130:, number of PRs ahead of you: **%d**.", failureMsg, lockAttempt.EnqueueStatus.QueueDepth) + } + return failureMsg +} diff --git a/server/events/project_locker_test.go b/server/events/project_locker_test.go index 62be1c40f9..8c0722cf7c 100644 --- a/server/events/project_locker_test.go +++ b/server/events/project_locker_test.go @@ -62,6 +62,84 @@ func TestDefaultProjectLocker_TryLockWhenLocked(t *testing.T) { }, res) } +func TestDefaultProjectLocker_TryLockWhenLockedAndEnqueued(t *testing.T) { + var githubClient *vcs.GithubClient + mockClient := vcs.NewClientProxy(githubClient, nil, nil, nil, nil) + mockLocker := mocks.NewMockLocker() + locker := events.DefaultProjectLocker{ + Locker: mockLocker, + VCSClient: mockClient, + } + expProject := models.Project{} + expWorkspace := "default" + expPull := models.PullRequest{} + expUser := models.User{} + + lockingPull := models.PullRequest{ + Num: 2, + } + When(mockLocker.TryLock(expProject, expWorkspace, expPull, expUser)).ThenReturn( + locking.TryLockResponse{ + LockAcquired: false, + CurrLock: models.ProjectLock{ + Pull: lockingPull, + }, + EnqueueStatus: &models.EnqueueStatus{ + Status: models.Enqueued, + QueueDepth: 1, + }, + LockKey: "", + }, + nil, + ) + res, err := locker.TryLock(logging.NewNoopLogger(t), expPull, expUser, expWorkspace, expProject, true) + link, _ := mockClient.MarkdownPullLink(lockingPull) + Ok(t, err) + Equals(t, &events.TryLockResponse{ + LockAcquired: false, + LockFailureReason: fmt.Sprintf("This project is currently locked by an unapplied plan from pull %s. This PR entered the waiting queue :clock130:, number of PRs ahead of you: **%d**.", link, 1), + }, res) +} + +func TestDefaultProjectLocker_TryLockWhenLockedAndAlreadyInQueue(t *testing.T) { + var githubClient *vcs.GithubClient + mockClient := vcs.NewClientProxy(githubClient, nil, nil, nil, nil) + mockLocker := mocks.NewMockLocker() + locker := events.DefaultProjectLocker{ + Locker: mockLocker, + VCSClient: mockClient, + } + expProject := models.Project{} + expWorkspace := "default" + expPull := models.PullRequest{} + expUser := models.User{} + + lockingPull := models.PullRequest{ + Num: 2, + } + When(mockLocker.TryLock(expProject, expWorkspace, expPull, expUser)).ThenReturn( + locking.TryLockResponse{ + LockAcquired: false, + CurrLock: models.ProjectLock{ + Pull: lockingPull, + }, + EnqueueStatus: &models.EnqueueStatus{ + Status: models.AlreadyInTheQueue, + QueueDepth: 1, + }, + LockKey: "", + }, + nil, + ) + res, err := locker.TryLock(logging.NewNoopLogger(t), expPull, expUser, expWorkspace, expProject, true) + link, _ := mockClient.MarkdownPullLink(lockingPull) + Ok(t, err) + Equals(t, &events.TryLockResponse{ + LockAcquired: false, + LockFailureReason: fmt.Sprintf("This project is currently locked by an unapplied plan from pull %s. This PR is already in the queue :clock130:, number of PRs ahead of you: **%d**.", link, 1), + }, res) +} + func TestDefaultProjectLocker_TryLockWhenLockedSamePull(t *testing.T) { RegisterMockTestingT(t) var githubClient *vcs.GithubClient @@ -95,10 +173,10 @@ func TestDefaultProjectLocker_TryLockWhenLockedSamePull(t *testing.T) { Equals(t, true, res.LockAcquired) // UnlockFn should work. - mockLocker.VerifyWasCalled(Never()).Unlock(lockKey) + mockLocker.VerifyWasCalled(Never()).Unlock(lockKey, false) err = res.UnlockFn() Ok(t, err) - mockLocker.VerifyWasCalledOnce().Unlock(lockKey) + mockLocker.VerifyWasCalledOnce().Unlock(lockKey, false) } func TestDefaultProjectLocker_TryLockUnlocked(t *testing.T) { @@ -134,10 +212,10 @@ func TestDefaultProjectLocker_TryLockUnlocked(t *testing.T) { Equals(t, true, res.LockAcquired) // UnlockFn should work. - mockLocker.VerifyWasCalled(Never()).Unlock(lockKey) + mockLocker.VerifyWasCalled(Never()).Unlock(lockKey, false) err = res.UnlockFn() Ok(t, err) - mockLocker.VerifyWasCalledOnce().Unlock(lockKey) + mockLocker.VerifyWasCalledOnce().Unlock(lockKey, false) } func TestDefaultProjectLocker_RepoLocking(t *testing.T) { diff --git a/server/events/pull_closed_executor.go b/server/events/pull_closed_executor.go index de3e4e8009..1c58a990cb 100644 --- a/server/events/pull_closed_executor.go +++ b/server/events/pull_closed_executor.go @@ -97,6 +97,7 @@ func (p *PullClosedExecutor) CleanUpPull(repo models.Repo, pull models.PullReque } } + // TODO(Ghais) extend the tests if err := p.WorkingDir.Delete(repo, pull); err != nil { return errors.Wrap(err, "cleaning workspace") } @@ -104,7 +105,7 @@ func (p *PullClosedExecutor) CleanUpPull(repo models.Repo, pull models.PullReque // Finally, delete locks. We do this last because when someone // unlocks a project, right now we don't actually delete the plan // so we might have plans laying around but no locks. - locks, err := p.Locker.UnlockByPull(repo.FullName, pull.Num) + locks, dequeueStatus, err := p.Locker.UnlockByPull(repo.FullName, pull.Num, true) if err != nil { return errors.Wrap(err, "cleaning up locks") } @@ -124,7 +125,27 @@ func (p *PullClosedExecutor) CleanUpPull(repo models.Repo, pull models.PullReque if err = pullClosedTemplate.Execute(&buf, templateData); err != nil { return errors.Wrap(err, "rendering template for comment") } - return p.VCSClient.CreateComment(repo, pull.Num, buf.String(), "") + + if err = p.VCSClient.CreateComment(repo, pull.Num, buf.String(), ""); err != nil { + return err + } + + if dequeueStatus != nil { + return p.commentOnDequeuedPullRequests(*dequeueStatus) + } + + return nil +} + +func (p *PullClosedExecutor) commentOnDequeuedPullRequests(dequeueStatus models.DequeueStatus) error { + locksByPullRequest := groupByPullRequests(dequeueStatus.ProjectLocks) + for pullRequestNumber, projectLocks := range locksByPullRequest { + planVcsMessage := buildCommentOnDequeuedPullRequest(projectLocks) + if err := p.VCSClient.CreateComment(projectLocks[0].Pull.BaseRepo, pullRequestNumber, planVcsMessage, ""); err != nil { + return errors.Wrapf(err, "unable to comment on PR %d", pullRequestNumber) + } + } + return nil } // buildTemplateData formats the lock data into a slice that can easily be diff --git a/server/events/pull_closed_executor_test.go b/server/events/pull_closed_executor_test.go index 77c1544be7..8066a0d37a 100644 --- a/server/events/pull_closed_executor_test.go +++ b/server/events/pull_closed_executor_test.go @@ -41,7 +41,7 @@ func TestCleanUpPullWorkspaceErr(t *testing.T) { RegisterMockTestingT(t) w := mocks.NewMockWorkingDir() tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) pce := events.PullClosedExecutor{ WorkingDir: w, @@ -60,7 +60,7 @@ func TestCleanUpPullUnlockErr(t *testing.T) { w := mocks.NewMockWorkingDir() l := lockmocks.NewMockLocker() tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) pce := events.PullClosedExecutor{ Locker: l, @@ -69,7 +69,7 @@ func TestCleanUpPullUnlockErr(t *testing.T) { PullClosedTemplate: &events.PullClosedEventTemplate{}, } err = errors.New("err") - When(l.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num)).ThenReturn(nil, err) + When(l.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num, true)).ThenReturn(nil, nil, err) actualErr := pce.CleanUpPull(testdata.GithubRepo, testdata.Pull) Equals(t, "cleaning up locks: err", actualErr.Error()) } @@ -81,7 +81,7 @@ func TestCleanUpPullNoLocks(t *testing.T) { l := lockmocks.NewMockLocker() cp := vcsmocks.NewMockClient() tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) pce := events.PullClosedExecutor{ Locker: l, @@ -89,7 +89,7 @@ func TestCleanUpPullNoLocks(t *testing.T) { WorkingDir: w, Backend: db, } - When(l.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num)).ThenReturn(nil, nil) + When(l.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num, true)).ThenReturn(nil, nil, nil) err = pce.CleanUpPull(testdata.GithubRepo, testdata.Pull) Ok(t, err) cp.VerifyWasCalled(Never()).CreateComment(Any[models.Repo](), Any[int](), Any[string](), Any[string]()) @@ -166,7 +166,7 @@ func TestCleanUpPullComments(t *testing.T) { cp := vcsmocks.NewMockClient() l := lockmocks.NewMockLocker() tmp := t.TempDir() - db, err := db.New(tmp) + db, err := db.New(tmp, false) Ok(t, err) pce := events.PullClosedExecutor{ Locker: l, @@ -175,7 +175,7 @@ func TestCleanUpPullComments(t *testing.T) { Backend: db, } t.Log("testing: " + c.Description) - When(l.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num)).ThenReturn(c.Locks, nil) + When(l.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num, true)).ThenReturn(c.Locks, nil, nil) err = pce.CleanUpPull(testdata.GithubRepo, testdata.Pull) Ok(t, err) _, _, comment, _ := cp.VerifyWasCalledOnce().CreateComment(Any[models.Repo](), Any[int](), Any[string](), Any[string]()).GetCapturedArguments() @@ -230,7 +230,7 @@ func TestCleanUpLogStreaming(t *testing.T) { }); err != nil { panic(errors.Wrap(err, "could not create bucket")) } - db, _ := db.NewWithDB(boltDB, lockBucket, configBucket) + db, _ := db.NewWithDB(boltDB, lockBucket, configBucket, false) result := []command.ProjectResult{ { RepoRelDir: testdata.GithubRepo.FullName, @@ -264,7 +264,7 @@ func TestCleanUpLogStreaming(t *testing.T) { Workspace: "default", }, } - When(locker.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num)).ThenReturn(locks, nil) + When(locker.UnlockByPull(testdata.GithubRepo.FullName, testdata.Pull.Num, true)).ThenReturn(locks, nil, nil) // Clean up. err = pullClosedExecutor.CleanUpPull(testdata.GithubRepo, testdata.Pull) diff --git a/server/events/unlock_command_runner.go b/server/events/unlock_command_runner.go index 1a7407397d..edace181e0 100644 --- a/server/events/unlock_command_runner.go +++ b/server/events/unlock_command_runner.go @@ -1,8 +1,11 @@ package events import ( + "fmt" "github.com/runatlantis/atlantis/server/events/command" + "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/vcs" + "strings" ) func NewUnlockCommandRunner( @@ -33,7 +36,7 @@ func (u *UnlockCommandRunner) Run( pullNum := ctx.Pull.Num vcsMessage := "All Atlantis locks for this PR have been unlocked and plans discarded" - numLocks, err := u.deleteLockCommand.DeleteLocksByPull(baseRepo.FullName, pullNum) + numLocks, dequeueStatus, err := u.deleteLockCommand.DeleteLocksByPull(baseRepo.FullName, pullNum) if err != nil { vcsMessage = "Failed to delete PR locks" ctx.Log.Err("failed to delete locks by pull %s", err.Error()) @@ -45,6 +48,42 @@ func (u *UnlockCommandRunner) Run( } if commentErr := u.vcsClient.CreateComment(baseRepo, pullNum, vcsMessage, command.Unlock.String()); commentErr != nil { - ctx.Log.Err("unable to comment: %s", commentErr) + ctx.Log.Err("unable to comment on PR %s: %s", pullNum, commentErr) } + + if dequeueStatus != nil { + u.commentOnDequeuedPullRequests(ctx, *dequeueStatus) + } +} + +func (u *UnlockCommandRunner) commentOnDequeuedPullRequests(ctx *command.Context, dequeueStatus models.DequeueStatus) { + locksByPullRequest := groupByPullRequests(dequeueStatus.ProjectLocks) + for pullRequestNumber, projectLocks := range locksByPullRequest { + planVcsMessage := buildCommentOnDequeuedPullRequest(projectLocks) + if commentErr := u.vcsClient.CreateComment(projectLocks[0].Pull.BaseRepo, pullRequestNumber, planVcsMessage, ""); commentErr != nil { + ctx.Log.Err("unable to comment on PR %d: %s", pullRequestNumber, commentErr) + } + } +} + +func groupByPullRequests(projectLocks []models.ProjectLock) map[int][]models.ProjectLock { + result := make(map[int][]models.ProjectLock) + for _, lock := range projectLocks { + result[lock.Pull.Num] = append(result[lock.Pull.Num], lock) + } + return result +} + +func buildCommentOnDequeuedPullRequest(projectLocks []models.ProjectLock) string { + var releasedLocksMessages []string + for _, lock := range projectLocks { + releasedLocksMessages = append(releasedLocksMessages, fmt.Sprintf("* dir: `%s` workspace: `%s`", lock.Project.Path, lock.Workspace)) + } + + // stick to the first User for now, if needed, create a list of unique users and mention them all + lockCreatorMention := "@" + projectLocks[0].User.Username + releasedLocksMessage := strings.Join(releasedLocksMessages, "\n") + + return fmt.Sprintf("%s\nThe following locks have been aquired by this PR and can now be planned:\n%s", + lockCreatorMention, releasedLocksMessage) } diff --git a/server/server.go b/server/server.go index 91aca3be87..a0caca2bca 100644 --- a/server/server.go +++ b/server/server.go @@ -435,13 +435,13 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { switch dbtype := userConfig.LockingDBType; dbtype { case "redis": logger.Info("Utilizing Redis DB") - backend, err = redis.New(userConfig.RedisHost, userConfig.RedisPort, userConfig.RedisPassword, userConfig.RedisTLSEnabled, userConfig.RedisInsecureSkipVerify, userConfig.RedisDB) + backend, err = redis.New(userConfig.RedisHost, userConfig.RedisPort, userConfig.RedisPassword, userConfig.RedisTLSEnabled, userConfig.RedisInsecureSkipVerify, userConfig.RedisDB, userConfig.QueueEnabled) if err != nil { return nil, err } case "boltdb": logger.Info("Utilizing BoltDB") - backend, err = db.New(userConfig.DataDir) + backend, err = db.New(userConfig.DataDir, userConfig.QueueEnabled) if err != nil { return nil, err } @@ -1032,6 +1032,8 @@ func (s *Server) Index(w http.ResponseWriter, _ *http.Request) { var lockResults []templates.LockIndexData for id, v := range locks { lockURL, _ := s.Router.Get(LockViewRouteName).URL("id", url.QueryEscape(id)) + queue, _ := s.Locker.GetQueueByLock(v.Project, v.Workspace) + queueIndexDataList := controllers.GetQueueItemIndexData(queue) lockResults = append(lockResults, templates.LockIndexData{ // NOTE: must use .String() instead of .Path because we need the // query params as part of the lock URL. @@ -1043,6 +1045,7 @@ func (s *Server) Index(w http.ResponseWriter, _ *http.Request) { Workspace: v.Workspace, Time: v.Time, TimeFormatted: v.Time.Format("02-01-2006 15:04:05"), + Queue: queueIndexDataList, }) } diff --git a/server/static/css/custom.css b/server/static/css/custom.css index 644131d328..bd5644dcaf 100644 --- a/server/static/css/custom.css +++ b/server/static/css/custom.css @@ -238,7 +238,7 @@ tbody { /* Styles for the lock index */ .lock-grid{ display: grid; - grid-template-columns: auto auto auto auto auto auto; + grid-template-columns: auto auto auto auto auto auto auto; border: 1px solid #dbeaf4; width: 100%; font-size: 12px; @@ -350,7 +350,7 @@ tbody { /* Add Animation */ @-webkit-keyframes animatetop { - from {top: -300px; opacity: 0} + from {top: -300px; opacity: 0} to {top: 0; opacity: 1} } @@ -369,7 +369,7 @@ tbody { .title-heading { font-family: monospace, monospace; font-size: 1.1em; text-align: center; - } + } .small { font-size: 1.0em; diff --git a/server/user_config.go b/server/user_config.go index 209482049e..a38dbee6a7 100644 --- a/server/user_config.go +++ b/server/user_config.go @@ -70,6 +70,7 @@ type UserConfig struct { PlanDrafts bool `mapstructure:"allow-draft-prs"` Port int `mapstructure:"port"` QuietPolicyChecks bool `mapstructure:"quiet-policy-checks"` + QueueEnabled bool `mapstructure:"queue-enabled"` RedisDB int `mapstructure:"redis-db"` RedisHost string `mapstructure:"redis-host"` RedisPassword string `mapstructure:"redis-password"`