Skip to content

Commit

Permalink
fix: Exception in chunk run causes full rollback & added tests (#1824)
Browse files Browse the repository at this point in the history
  • Loading branch information
JanCizmar committed Aug 7, 2023
1 parent f3d78cc commit f2aac7c
Show file tree
Hide file tree
Showing 27 changed files with 799 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import io.tolgee.batch.request.BatchTranslateRequest
import io.tolgee.batch.request.ClearTranslationsRequest
import io.tolgee.batch.request.CopyTranslationRequest
import io.tolgee.batch.request.DeleteKeysRequest
import io.tolgee.batch.request.SetKeysNamespaceRequest
import io.tolgee.batch.request.SetTranslationsStateStateRequest
import io.tolgee.batch.request.TagKeysRequest
import io.tolgee.batch.request.UntagKeysRequest
import io.tolgee.constants.Message
import io.tolgee.exceptions.BadRequestException
import io.tolgee.hateoas.batch.BatchJobModel
import io.tolgee.hateoas.batch.BatchJobModelAssembler
import io.tolgee.model.batch.BatchJob
Expand Down Expand Up @@ -112,6 +117,54 @@ class StartBatchJobController(
).model
}

@PostMapping(value = ["/tag-keys"])
@AccessWithApiKey()
@AccessWithProjectPermission(Scope.KEYS_EDIT)
@Operation(summary = "Tag keys")
fun tagKeys(@Valid @RequestBody data: TagKeysRequest): BatchJobModel {
data.tags.validate()
securityService.checkKeyIdsExistAndIsFromProject(data.keyIds, projectHolder.project.id)
return batchJobService.startJob(
data,
projectHolder.projectEntity,
authenticationFacade.userAccountEntity,
BatchJobType.TAG_KEYS
).model
}

@PostMapping(value = ["/untag-keys"])
@AccessWithApiKey()
@AccessWithProjectPermission(Scope.KEYS_EDIT)
@Operation(summary = "Tag keys")
fun untagKeys(@Valid @RequestBody data: UntagKeysRequest): BatchJobModel {
securityService.checkKeyIdsExistAndIsFromProject(data.keyIds, projectHolder.project.id)
return batchJobService.startJob(
data,
projectHolder.projectEntity,
authenticationFacade.userAccountEntity,
BatchJobType.UNTAG_KEYS
).model
}

@PostMapping(value = ["/set-keys-namespace"])
@AccessWithApiKey()
@AccessWithProjectPermission(Scope.KEYS_EDIT)
@Operation(summary = "Tag keys")
fun setKeysNamespace(@Valid @RequestBody data: SetKeysNamespaceRequest): BatchJobModel {
securityService.checkKeyIdsExistAndIsFromProject(data.keyIds, projectHolder.project.id)
return batchJobService.startJob(
data,
projectHolder.projectEntity,
authenticationFacade.userAccountEntity,
BatchJobType.SET_KEYS_NAMESPACE
).model
}

val BatchJob.model
get() = batchJobModelAssembler.toModel(batchJobService.getView(this))

private fun List<String>.validate() {
if (this.any { it.isBlank() }) throw BadRequestException(Message.TAG_IS_BLANK)
if (this.any { it.length > 100 }) throw BadRequestException(Message.TAG_TOO_LOG)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package io.tolgee.api.v2.controllers.batch

import io.tolgee.ProjectAuthControllerTest
import io.tolgee.batch.BatchJobChunkExecutionQueue
import io.tolgee.batch.BatchJobService
import io.tolgee.development.testDataBuilder.data.BatchJobsTestData
import io.tolgee.fixtures.andAssertThatJson
import io.tolgee.fixtures.andIsBadRequest
import io.tolgee.fixtures.andIsOk
import io.tolgee.fixtures.andPrettyPrint
import io.tolgee.fixtures.isValidId
import io.tolgee.fixtures.waitFor
import io.tolgee.fixtures.waitForNotThrowing
import io.tolgee.model.batch.BatchJob
import io.tolgee.model.batch.BatchJobStatus
Expand All @@ -19,6 +23,8 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc
import org.springframework.test.web.servlet.ResultActions
import java.util.function.Consumer

@AutoConfigureMockMvc
@ContextRecreatingTest
Expand All @@ -29,6 +35,9 @@ class StartBatchJobControllerTest : ProjectAuthControllerTest("/v2/projects/") {
@Autowired
lateinit var batchJobOperationQueue: BatchJobChunkExecutionQueue

@Autowired
lateinit var batchJobService: BatchJobService

@BeforeEach
fun setup() {
batchJobOperationQueue.clear()
Expand Down Expand Up @@ -222,4 +231,136 @@ class StartBatchJobControllerTest : ProjectAuthControllerTest("/v2/projects/") {
all.count { it.text?.startsWith("en") == true }.assert.isEqualTo(allKeyIds.size + keyIds.size * 2)
}
}

@Test
@ProjectJWTAuthTestMethod
fun `it validates tag length`() {
performProjectAuthPost(
"start-batch-job/tag-keys",
mapOf(
"keyIds" to listOf(1),
"tags" to listOf("a".repeat(101)),
)
).andIsBadRequest.andPrettyPrint
}

@Test
@ProjectJWTAuthTestMethod
fun `it tags keys`() {
val keyCount = 1000
val keys = testData.addTagKeysData(keyCount)
saveAndPrepare()

val allKeyIds = keys.map { it.id }.toList()
val keyIds = allKeyIds.take(500)
val newTags = listOf("tag1", "tag3", "a-tag", "b-tag")

performProjectAuthPost(
"start-batch-job/tag-keys",
mapOf(
"keyIds" to keyIds,
"tags" to newTags,
)
).andIsOk

waitForNotThrowing(pollTime = 1000, timeout = 10000) {
val all = keyService.getKeysWithTagsById(keyIds)
all.assert.hasSize(keyIds.size)
all.count {
it.keyMeta?.tags?.map { it.name }?.containsAll(newTags) == true
}.assert.isEqualTo(keyIds.size)
}
}

@Test
@ProjectJWTAuthTestMethod
fun `it untags keys`() {
val keyCount = 1000
val keys = testData.addTagKeysData(keyCount)
saveAndPrepare()

val allKeyIds = keys.map { it.id }.toList()
val keyIds = allKeyIds.take(300)
val tagsToRemove = listOf("tag1", "a-tag", "b-tag")

performProjectAuthPost(
"start-batch-job/untag-keys",
mapOf(
"keyIds" to keyIds,
"tags" to tagsToRemove
)
).andIsOk

waitForNotThrowing(pollTime = 1000, timeout = 10000) {
val all = keyService.getKeysWithTagsById(keyIds)
all.assert.hasSize(keyIds.size)
all.count {
it.keyMeta?.tags?.map { it.name }?.any { tagsToRemove.contains(it) } == false &&
it.keyMeta?.tags?.map { it.name }?.contains("tag3") == true
}.assert.isEqualTo(keyIds.size)
}
}

@Test
@ProjectJWTAuthTestMethod
fun `it moves to other namespace`() {
val keys = testData.addNamespaceData()
saveAndPrepare()

val allKeyIds = keys.map { it.id }.toList()
val keyIds = allKeyIds.take(700)

performProjectAuthPost(
"start-batch-job/set-keys-namespace",
mapOf(
"keyIds" to keyIds,
"namespace" to "other-namespace"
)
).andIsOk.waitForJobCompleted()

val all = keyService.find(keyIds)
all.count { it.namespace?.name == "other-namespace" }.assert.isEqualTo(keyIds.size)
}

@Test
@ProjectJWTAuthTestMethod
fun `it fails on collision`() {
testData.addNamespaceData()
val key = testData.projectBuilder.addKey(keyName = "key").self
saveAndPrepare()

val jobId = performProjectAuthPost(
"start-batch-job/set-keys-namespace",
mapOf(
"keyIds" to listOf(key.id),
"namespace" to "namespace"
)
).andIsOk.waitForJobCompleted().jobId
keyService.get(key.id).namespace.assert.isNull()
batchJobService.findJobDto(jobId)?.status.assert.isEqualTo(BatchJobStatus.FAILED)
}

fun ResultActions.waitForJobCompleted() = andAssertThatJson {
node("id").isNumber.satisfies(
Consumer {
waitFor(pollTime = 2000) {
val job = batchJobService.findJobDto(it.toLong())
job?.status?.completed == true
}
}
)
}

val ResultActions.jobId: Long
get() {
var jobId: Long? = null
this.andAssertThatJson {
node("id").isNumber.satisfies(
Consumer {
jobId = it.toLong()
}
)
}
return jobId!!
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ enum class ActivityType(
CREATE_PROJECT,
EDIT_PROJECT,
NAMESPACE_EDIT,
BATCH_AUTO_TRANSLATE,
CLEAR_TRANSLATIONS,
COPY_TRANSLATIONS
BATCH_AUTO_TRANSLATE(true),
BATCH_CLEAR_TRANSLATIONS(true),
BATCH_COPY_TRANSLATIONS(true),
BATCH_SET_TRANSLATION_STATE(true),
BATCH_TAG_KEYS(true),
BATCH_UNTAG_KEYS(true),
BATCH_SET_KEYS_NAMESPACE(true)
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.tolgee.batch

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.sentry.Sentry
import io.tolgee.component.SavePointManager
import io.tolgee.component.UsingRedisProvider
import io.tolgee.model.batch.BatchJobChunkExecution
import io.tolgee.model.batch.BatchJobChunkExecutionStatus
Expand All @@ -18,6 +19,7 @@ import org.springframework.data.redis.core.StringRedisTemplate
import org.springframework.stereotype.Service
import org.springframework.transaction.PlatformTransactionManager
import org.springframework.transaction.TransactionDefinition
import org.springframework.transaction.UnexpectedRollbackException
import javax.persistence.EntityManager
import javax.persistence.LockModeType

Expand All @@ -34,7 +36,8 @@ class BatchJobActionService(
private val batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue,
@Lazy
private val redisTemplate: StringRedisTemplate,
private val concurrentExecutionLauncher: BatchJobConcurrentLauncher
private val concurrentExecutionLauncher: BatchJobConcurrentLauncher,
private val savePointManager: SavePointManager
) : Logging {
companion object {
const val MIN_TIME_BETWEEN_OPERATIONS = 10
Expand All @@ -49,38 +52,59 @@ class BatchJobActionService(

concurrentExecutionLauncher.run { executionItem, coroutineContext ->
var retryExecution: BatchJobChunkExecution? = null
val execution = executeInNewTransaction(
transactionManager,
isolationLevel = TransactionDefinition.ISOLATION_DEFAULT
) {
catchingExceptions(executionItem) {
try {
val execution = executeInNewTransaction(
transactionManager,
isolationLevel = TransactionDefinition.ISOLATION_DEFAULT
) { transactionStatus ->
catchingExceptions(executionItem) {

val lockedExecution = getPendingUnlockedExecutionItem(executionItem)
?: return@executeInNewTransaction null
val lockedExecution = getPendingUnlockedExecutionItem(executionItem)
?: return@executeInNewTransaction null

publishRemoveConsuming(executionItem)
publishRemoveConsuming(executionItem)

progressManager.handleJobRunning(lockedExecution.batchJob.id)
val batchJobDto = batchJobService.getJobDto(lockedExecution.batchJob.id)
progressManager.handleJobRunning(lockedExecution.batchJob.id)
val batchJobDto = batchJobService.getJobDto(lockedExecution.batchJob.id)

logger.debug("Job ${batchJobDto.id}: 🟡 Processing chunk ${lockedExecution.id}")
val util = ChunkProcessingUtil(lockedExecution, applicationContext, coroutineContext)
util.processChunk()
logger.debug("Job ${batchJobDto.id}: 🟡 Processing chunk ${lockedExecution.id}")
val savepoint = savePointManager.setSavepoint()
val util = ChunkProcessingUtil(lockedExecution, applicationContext, coroutineContext)
util.processChunk()

progressManager.handleProgress(lockedExecution)
entityManager.persist(lockedExecution)
if (transactionStatus.isRollbackOnly) {
logger.debug("Job ${batchJobDto.id}: 🛑 Rollbacking chunk ${lockedExecution.id}")
savePointManager.rollbackSavepoint(savepoint)
}

if (lockedExecution.retry) {
retryExecution = util.retryExecution
entityManager.persist(util.retryExecution)
}
progressManager.handleProgress(lockedExecution)
entityManager.persist(lockedExecution)

if (lockedExecution.retry) {
retryExecution = util.retryExecution
entityManager.persist(util.retryExecution)
}

logger.debug("Job ${batchJobDto.id}: ✅ Processed chunk ${lockedExecution.id}")
return@executeInNewTransaction lockedExecution
logger.debug("Job ${batchJobDto.id}: ✅ Processed chunk ${lockedExecution.id}")
return@executeInNewTransaction lockedExecution
}
}
execution?.let { progressManager.handleChunkCompletedCommitted(it) }
addRetryExecutionToQueue(retryExecution)
} catch (e: Throwable) {
when (e) {
is UnexpectedRollbackException -> {
logger.debug(
"Job ${executionItem.jobId}: ⚠️ Chunk ${executionItem.chunkExecutionId}" +
" thrown UnexpectedRollbackException"
)
}
else -> {
logger.error("Job ${executionItem.jobId}: ⚠️ Chunk ${executionItem.chunkExecutionId} thrown error", e)
Sentry.captureException(e)
}
}
}
execution?.let { progressManager.handleChunkCompletedCommitted(it) }
addRetryExecutionToQueue(retryExecution)
}
}

Expand Down
Loading

0 comments on commit f2aac7c

Please sign in to comment.