Skip to content

Commit

Permalink
chore: Rename queue, clear queue in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JanCizmar committed Jul 17, 2023
1 parent d8ab44c commit 2fbb380
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package io.tolgee.api.v2.controllers.batch

import io.tolgee.ProjectAuthControllerTest
import io.tolgee.batch.BatchJobActionService
import io.tolgee.batch.BatchJobChunkExecutionQueue
import io.tolgee.batch.BatchJobConcurrentLauncher
import io.tolgee.batch.BatchJobDto
import io.tolgee.batch.BatchJobService
import io.tolgee.batch.BatchJobType
import io.tolgee.batch.JobChunkExecutionQueue
import io.tolgee.batch.processors.TranslationChunkProcessor
import io.tolgee.batch.request.BatchTranslateRequest
import io.tolgee.batch.state.BatchJobStateProvider
Expand Down Expand Up @@ -58,7 +58,7 @@ class BatchJobManagementControllerTest : ProjectAuthControllerTest("/v2/projects
lateinit var batchJobStateProvider: BatchJobStateProvider

@Autowired
lateinit var jobChunkExecutionQueue: JobChunkExecutionQueue
lateinit var batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue

@Autowired
lateinit var batchJobConcurrentLauncher: BatchJobConcurrentLauncher
Expand All @@ -69,7 +69,7 @@ class BatchJobManagementControllerTest : ProjectAuthControllerTest("/v2/projects
@BeforeEach
fun setup() {
testData = BatchJobsTestData()
jobChunkExecutionQueue.populateQueue()
batchJobChunkExecutionQueue.populateQueue()
whenever(translationChunkProcessor.getParams(any<BatchTranslateRequest>(), any())).thenCallRealMethod()
whenever(translationChunkProcessor.getTarget(any<BatchTranslateRequest>())).thenCallRealMethod()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.tolgee.api.v2.controllers.batch

import io.tolgee.ProjectAuthControllerTest
import io.tolgee.batch.BatchJobChunkExecutionQueue
import io.tolgee.development.testDataBuilder.data.BatchJobsTestData
import io.tolgee.fixtures.andAssertThatJson
import io.tolgee.fixtures.andIsOk
Expand All @@ -16,6 +17,7 @@ import io.tolgee.testing.assert
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc

@AutoConfigureMockMvc
Expand All @@ -24,8 +26,12 @@ class StartBatchJobControllerTest : ProjectAuthControllerTest("/v2/projects/") {
lateinit var testData: BatchJobsTestData
var fakeBefore = false

@Autowired
lateinit var batchJobOperationQueue: BatchJobChunkExecutionQueue

@BeforeEach
fun setup() {
batchJobOperationQueue.clear()
testData = BatchJobsTestData()
fakeBefore = internalProperties.fakeMtProviders
internalProperties.fakeMtProviders = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {
lateinit var batchJobCancellationManager: BatchJobCancellationManager

@Autowired
lateinit var jobChunkExecutionQueue: JobChunkExecutionQueue
lateinit var batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue

@BeforeEach
fun setup() {
jobChunkExecutionQueue.clear()
batchJobChunkExecutionQueue.clear()
Mockito.reset(translationChunkProcessor)
Mockito.clearInvocations(translationChunkProcessor)
whenever(translationChunkProcessor.getParams(any(), any())).thenCallRealMethod()
whenever(translationChunkProcessor.getTarget(any())).thenCallRealMethod()
whenever(deleteKeysChunkProcessor.getParams(any(), any())).thenCallRealMethod()
whenever(deleteKeysChunkProcessor.getTarget(any())).thenCallRealMethod()
jobChunkExecutionQueue.populateQueue()
batchJobChunkExecutionQueue.populateQueue()
testData = BatchJobsTestData()
testDataService.saveTestData(testData.root)
currentDateProvider.forcedDate = Date(1687237928000)
Expand All @@ -95,7 +95,7 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {

@AfterEach()
fun teardown() {
jobChunkExecutionQueue.clear()
batchJobChunkExecutionQueue.clear()
currentDateProvider.forcedDate = null
websocketHelper.stop()
}
Expand Down Expand Up @@ -199,7 +199,7 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {

(1..3).forEach {
waitForNotThrowing {
jobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 2000 }.assert.isNotNull
batchJobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 2000 }.assert.isNotNull
}
currentDateProvider.forcedDate = Date(currentDateProvider.date.time + 2000)
}
Expand Down Expand Up @@ -253,19 +253,19 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {
val job = runChunkedJob(1000)

waitForNotThrowing {
jobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 100 }.assert.isNotNull
batchJobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 100 }.assert.isNotNull
}

currentDateProvider.forcedDate = Date(currentDateProvider.date.time + 100)

waitForNotThrowing {
jobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 1000 }.assert.isNotNull
batchJobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 1000 }.assert.isNotNull
}

currentDateProvider.forcedDate = Date(currentDateProvider.date.time + 1000)

waitForNotThrowing {
jobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 10000 }.assert.isNotNull
batchJobChunkExecutionQueue.find { it.executeAfter == currentDateProvider.date.time + 10000 }.assert.isNotNull
}

currentDateProvider.forcedDate = Date(currentDateProvider.date.time + 10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ class BatchJobsGeneralWithRedisTest : AbstractBatchJobsGeneralTest() {
runChunkedJob(keyCount = 200)

waitForNotThrowing {
val peek = jobChunkExecutionQueue.peek()
val peek = batchJobChunkExecutionQueue.peek()
peek.assert.isNotNull
}

val peek = jobChunkExecutionQueue.peek()
jobChunkExecutionQueue.contains(peek).assert.isTrue()
val peek = batchJobChunkExecutionQueue.peek()
batchJobChunkExecutionQueue.contains(peek).assert.isTrue()
Mockito.clearInvocations(redisTemplate)
batchJobActionService.publishRemoveConsuming(peek)
verify(redisTemplate, times(1))
Expand All @@ -82,7 +82,7 @@ class BatchJobsGeneralWithRedisTest : AbstractBatchJobsGeneralTest() {
)
)
waitForNotThrowing(timeout = 2000) {
jobChunkExecutionQueue.contains(peek).assert.isFalse()
batchJobChunkExecutionQueue.contains(peek).assert.isFalse()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BatchJobActionService(
private val progressManager: ProgressManager,
@Lazy
private val batchJobService: BatchJobService,
private val jobChunkExecutionQueue: JobChunkExecutionQueue,
private val batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue,
@Lazy
private val redisTemplate: StringRedisTemplate,
private val concurrentExecutionLauncher: BatchJobConcurrentLauncher
Expand All @@ -44,7 +44,7 @@ class BatchJobActionService(
fun run() {
println("Application ready")
executeInNewTransaction(transactionManager) {
jobChunkExecutionQueue.populateQueue()
batchJobChunkExecutionQueue.populateQueue()
}

concurrentExecutionLauncher.run { executionItem, coroutineContext ->
Expand Down Expand Up @@ -99,7 +99,7 @@ class BatchJobActionService(

private fun addRetryExecutionToQueue(retryExecution: BatchJobChunkExecution?) {
retryExecution?.let {
jobChunkExecutionQueue.addToQueue(listOf(it))
batchJobChunkExecutionQueue.addToQueue(listOf(it))
logger.debug("Job ${it.batchJob.id}: Added chunk ${it.id} for re-trial")
}
}
Expand All @@ -110,7 +110,7 @@ class BatchJobActionService(
} catch (e: Throwable) {
logger.error("Error processing chunk ${executionItem.chunkExecutionId}", e)
Sentry.captureException(e)
jobChunkExecutionQueue.addItemsToLocalQueue(listOf(executionItem))
batchJobChunkExecutionQueue.addItemsToLocalQueue(listOf(executionItem))
null
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ class BatchJobActionService(
}

fun cancelLocalJob(jobId: Long) {
jobChunkExecutionQueue.cancelJob(jobId)
batchJobChunkExecutionQueue.cancelJob(jobId)
concurrentExecutionLauncher.runningJobs.filter { it.value.first == jobId }.forEach {
it.value.second.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import javax.persistence.EntityManager

@Component
class JobChunkExecutionQueue(
class BatchJobChunkExecutionQueue(
private val entityManager: EntityManager,
private val usingRedisProvider: UsingRedisProvider,
@Lazy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlin.coroutines.CoroutineContext
@Component
class BatchJobConcurrentLauncher(
private val batchProperties: BatchProperties,
private val jobChunkExecutionQueue: JobChunkExecutionQueue,
private val batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue,
private val currentDateProvider: CurrentDateProvider
) : Logging {
companion object {
Expand Down Expand Up @@ -68,7 +68,7 @@ class BatchJobConcurrentLauncher(
}

private fun getSleepTime(startTime: Long): Long {
if (!jobChunkExecutionQueue.isEmpty() && jobsToLaunch > 0) {
if (!batchJobChunkExecutionQueue.isEmpty() && jobsToLaunch > 0) {
return 0
}
return BatchJobActionService.MIN_TIME_BETWEEN_OPERATIONS - (System.currentTimeMillis() - startTime)
Expand All @@ -89,7 +89,7 @@ class BatchJobConcurrentLauncher(

logger.trace("Jobs to launch: $jobsToLaunch")
val items = (1..jobsToLaunch)
.mapNotNull { jobChunkExecutionQueue.poll() }
.mapNotNull { batchJobChunkExecutionQueue.poll() }

logItemsPulled(items)

Expand All @@ -108,8 +108,8 @@ class BatchJobConcurrentLauncher(
)
}
logger.debug(
"${jobChunkExecutionQueue.size} is left in the queue (${System.identityHashCode(jobChunkExecutionQueue)}): " +
jobChunkExecutionQueue.joinToString(", ") { it.chunkExecutionId.toString() }
"${batchJobChunkExecutionQueue.size} is left in the queue (${System.identityHashCode(batchJobChunkExecutionQueue)}): " +
batchJobChunkExecutionQueue.joinToString(", ") { it.chunkExecutionId.toString() }
)
}

Expand All @@ -122,7 +122,7 @@ class BatchJobConcurrentLauncher(
"""Execution ${executionItem.chunkExecutionId} not ready to execute, adding back to queue:
| Difference ${executionItem.executeAfter!! - currentDateProvider.date.time}""".trimMargin()
)
jobChunkExecutionQueue.addItemsToLocalQueue(listOf(executionItem))
batchJobChunkExecutionQueue.addItemsToLocalQueue(listOf(executionItem))
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BatchJobService(
private val cachingBatchJobService: CachingBatchJobService,
@Lazy
private val progressManager: ProgressManager,
private val jobChunkExecutionQueue: JobChunkExecutionQueue,
private val batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue,
private val currentDateProvider: CurrentDateProvider,
private val securityService: SecurityService,
private val authenticationFacade: AuthenticationFacade
Expand Down Expand Up @@ -85,11 +85,11 @@ class BatchJobService(
job
}

executions?.let { jobChunkExecutionQueue.addToQueue(it) }
executions?.let { batchJobChunkExecutionQueue.addToQueue(it) }
logger.debug(
"Starting job ${job.id}, aadded ${executions?.size} executions to queue ${
System.identityHashCode(
jobChunkExecutionQueue
batchJobChunkExecutionQueue
)
}"
)
Expand Down

0 comments on commit 2fbb380

Please sign in to comment.