Skip to content

Commit

Permalink
fix to avoid deletion when token already present in JobStore
Browse files Browse the repository at this point in the history
  • Loading branch information
VishnuKarthikRavindran committed Jan 13, 2022
1 parent 45f5bc0 commit 006337a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
3 changes: 2 additions & 1 deletion agent/task/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func (p *pool) AcquireBufferToken(jobId string) PoolErrorCode {
// removing expired tokens only when the job queue is full
expirationTime := currentTime.Add(time.Duration(-unusedTokenValidityInMinutes) * time.Minute)
for tokenId, tokenTime := range p.tokenHoldingJobIds {
if tokenTime.Before(expirationTime) {
// hasJob condition added to handle long-running commands
if tokenTime.Before(expirationTime) && !p.HasJob(tokenId) {
p.log.Warnf("removing expired token %v from the TokenBuffer", tokenId)
delete(p.tokenHoldingJobIds, tokenId)
}
Expand Down
18 changes: 18 additions & 0 deletions agent/task/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ func TestAcquireToken_ExpiredToken(t *testing.T) {
assert.Equal(t, JobQueueFull, errorCode)
}

// TestAcquireToken_JobAlreadyPresentInJobStore_ShouldNotDeleteToken
// tests expired token deletion when the job already present in Job Store
func TestAcquireToken_JobAlreadyPresentInJobStore_ShouldNotDeleteToken(t *testing.T) {
// basic pool
poolRef := NewPool(logger, 1, 2, 100*time.Millisecond, times.NewMockedClock())
poolObj := poolRef.(*pool)
expirationTime := time.Now().Add(time.Duration(-unusedTokenValidityInMinutes-2) * time.Minute) // added extra expiry
// If buffer full, delete tokens only not in job store
// In this case, no deletion happens
poolObj.tokenHoldingJobIds["job 1"] = &expirationTime
poolObj.jobStore.AddJob("job 1", &JobToken{})
poolObj.tokenHoldingJobIds["job 2"] = &expirationTime
poolObj.jobStore.AddJob("job 2", &JobToken{})

errorCode := poolObj.AcquireBufferToken("job 3")
assert.Equal(t, JobQueueFull, errorCode)
}

// TestReleaseAndToken_BasicTest test basic release and acquire tokens operations
func TestReleaseAndToken_BasicTest(t *testing.T) {
workerLimit := 5
Expand Down

0 comments on commit 006337a

Please sign in to comment.