Skip to content

Commit

Permalink
feat: Lock single job for a project
Browse files Browse the repository at this point in the history
  • Loading branch information
JanCizmar committed Jul 26, 2023
1 parent 40b4f6b commit 784e8c0
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import io.tolgee.exceptions.OutOfCreditsException
import io.tolgee.fixtures.waitFor
import io.tolgee.fixtures.waitForNotThrowing
import io.tolgee.model.batch.BatchJob
import io.tolgee.model.batch.BatchJobChunkExecution
import io.tolgee.model.batch.BatchJobChunkExecutionStatus
import io.tolgee.model.batch.BatchJobStatus
import io.tolgee.security.JwtTokenProvider
import io.tolgee.testing.WebsocketTest
Expand All @@ -21,14 +23,17 @@ import kotlinx.coroutines.ensureActive
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.eq
import org.mockito.Mockito
import org.mockito.kotlin.any
import org.mockito.kotlin.argThat
import org.mockito.kotlin.timeout
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.boot.test.mock.mockito.SpyBean
import org.springframework.boot.test.web.server.LocalServerPort
import org.springframework.test.annotation.DirtiesContext
import java.util.*
Expand Down Expand Up @@ -72,6 +77,13 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {
@Autowired
lateinit var batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue

@Autowired
lateinit var batchJobConcurrentLauncher: BatchJobConcurrentLauncher

@Autowired
@SpyBean
lateinit var batchJobProjectLockingManager: BatchJobProjectLockingManager

@BeforeEach
fun setup() {
batchJobChunkExecutionQueue.clear()
Expand Down Expand Up @@ -345,18 +357,99 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {

batchJobCancellationManager.cancel(job.id)

job.waitForCompleted().status.assert.isEqualTo(BatchJobStatus.CANCELLED)

websocketHelper.receivedMessages.assert.hasSizeGreaterThan(49)
websocketHelper.receivedMessages.last.contains("CANCELLED")
}

@Test
fun `it locks the single job for project`() {
batchJobConcurrentLauncher.pause = true

val job1 = runChunkedJob(20)
val job2 = runChunkedJob(20)

val executions = getExecutions(listOf(job1.id, job2.id))

val firstExecution = executions[job1.id]!!.first()
val secondExecution = executions[job2.id]!!.first()
val thirdExecution = executions[job1.id]!![1]
val fourthExecution = executions[job2.id]!![1]

batchJobChunkExecutionQueue.clear()

batchJobConcurrentLauncher.pause = false

batchJobChunkExecutionQueue.addToQueue(listOf(firstExecution))

waitFor(pollTime = 1000) {
batchJobService.getExecution(firstExecution.id).status == BatchJobChunkExecutionStatus.SUCCESS
}
// The first job is now locked
batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).assert.isEqualTo(job1.id)

batchJobChunkExecutionQueue.addToQueue(listOf(secondExecution))

// it tries to lock the second job but it can't since the first job is locked
verify(batchJobProjectLockingManager, timeout(1000).times(1))
.canRunBatchJobOfExecution(
argThat {
this.batchJob.id == job2.id
}
)

// it doesn't run the second execution since the first job is locked
Thread.sleep(1000)
batchJobService.getExecution(secondExecution.id).status.assert.isEqualTo(BatchJobChunkExecutionStatus.PENDING)

batchJobChunkExecutionQueue.addToQueue(listOf(thirdExecution))

// second and last execution of job1 is done, so the second job is locked now
waitFor(pollTime = 1000) {
batchJobService.getExecution(thirdExecution.id).status == BatchJobChunkExecutionStatus.SUCCESS
}

waitForNotThrowing {
// the project was unlocked before job2 acquired the job
verify(batchJobProjectLockingManager, times(1)).unlockJobForProject(eq(job1.project.id))
}

waitForNotThrowing(pollTime = 1000) {
batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).assert.isEqualTo(job2.id)
}

batchJobChunkExecutionQueue.addToQueue(listOf(fourthExecution))

waitFor(pollTime = 1000) {
batchJobService.getExecution(fourthExecution.id).status == BatchJobChunkExecutionStatus.SUCCESS &&
batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id) == 0L
}

batchJobService.getJobDto(job1.id).status.assert.isEqualTo(BatchJobStatus.SUCCESS)
batchJobService.getJobDto(job2.id).status.assert.isEqualTo(BatchJobStatus.SUCCESS)
}

private fun BatchJob.waitForCompleted(): BatchJobDto {
waitForNotThrowing(pollTime = 1000) {
executeInNewTransaction {
val finishedJob = batchJobService.getJobDto(job.id)
finishedJob.status.assert.isEqualTo(BatchJobStatus.CANCELLED)
val finishedJob = batchJobService.getJobDto(this.id)
finishedJob.status.completed.assert.isTrue()
}
}

websocketHelper.receivedMessages.assert.hasSizeGreaterThan(49)
websocketHelper.receivedMessages.last.contains("CANCELLED")
return batchJobService.getJobDto(this.id)
}

protected fun runChunkedJob(keyCount: Int): BatchJob {
private fun getExecutions(
jobIds: List<Long>,
): Map<Long, List<BatchJobChunkExecution>> =
entityManager.createQuery(
"""from BatchJobChunkExecution b where b.batchJob.id in :ids""",
BatchJobChunkExecution::class.java
)
.setParameter("ids", jobIds).resultList.groupBy { it.batchJob.id }

fun runChunkedJob(keyCount: Int): BatchJob {
return executeInNewTransaction {
batchJobService.startJob(
request = BatchTranslateRequest().apply {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class BatchJobActionService(
@Lazy
private val redisTemplate: StringRedisTemplate,
private val concurrentExecutionLauncher: BatchJobConcurrentLauncher,
private val savePointManager: SavePointManager
private val savePointManager: SavePointManager,
private val batchJobProjectLockingManager: BatchJobProjectLockingManager
) : Logging {
companion object {
const val MIN_TIME_BETWEEN_OPERATIONS = 10
Expand Down Expand Up @@ -99,6 +100,7 @@ class BatchJobActionService(
" thrown UnexpectedRollbackException"
)
}

else -> {
logger.error("Job ${executionItem.jobId}: ⚠️ Chunk ${executionItem.chunkExecutionId} thrown error", e)
Sentry.captureException(e)
Expand All @@ -109,7 +111,8 @@ class BatchJobActionService(
}

private fun getPendingUnlockedExecutionItem(executionItem: ExecutionQueueItem): BatchJobChunkExecution? {
val lockedExecution = getExecutionIfCanAcquireLock(executionItem.chunkExecutionId)
val lockedExecution = getExecutionIfCanAcquireLockInDb(executionItem.chunkExecutionId)

if (lockedExecution == null) {
logger.debug("⚠️ Chunk ${executionItem.chunkExecutionId} is locked, skipping")
return null
Expand All @@ -118,7 +121,8 @@ class BatchJobActionService(
logger.debug("⚠️ Chunk ${executionItem.chunkExecutionId} is not pending, skipping")
return null
}
return lockedExecution

return getExecutionIfCanLockJobForProject(lockedExecution)
}

private fun addRetryExecutionToQueue(retryExecution: BatchJobChunkExecution?) {
Expand Down Expand Up @@ -147,12 +151,26 @@ class BatchJobActionService(
}
}

fun getExecutionIfCanAcquireLock(id: Long): BatchJobChunkExecution? {
/**
* Only single job can run in project at the same time
*/
private fun getExecutionIfCanLockJobForProject(execution: BatchJobChunkExecution): BatchJobChunkExecution? {
if (batchJobProjectLockingManager.canRunBatchJobOfExecution(execution)) {
return execution
}
logger.debug("⚠️ Cannot run execution ${execution.id}. Other job from the project is currently running, skipping")

// we haven't publish consuming, so we can add it only to the local queue
batchJobChunkExecutionQueue.addExecutionsToLocalQueue(listOf(execution))
return null
}

private fun getExecutionIfCanAcquireLockInDb(id: Long): BatchJobChunkExecution? {
entityManager.createNativeQuery("""SET enable_seqscan=off""")
return entityManager.createQuery(
"""
from BatchJobChunkExecution bjce
where bjce.id = :id
from BatchJobChunkExecution bjce
where bjce.id = :id
""".trimIndent(),
BatchJobChunkExecution::class.java
)
Expand Down
Loading

0 comments on commit 784e8c0

Please sign in to comment.