Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improvements cherypicked #1835

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ When no languages provided, it translates only untranslated languages."""
key = key,
languageTags = languages?.toList(),
useTranslationMemory = useTranslationMemory ?: false,
useMachineTranslation = useMachineTranslation ?: false
useMachineTranslation = useMachineTranslation ?: false,
isBatch = true
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() {
waitForNotThrowing(pollTime = 1000) {
verify(
preTranslationByTmChunkProcessor,
times(ceil(job.totalItems.toDouble() / BatchJobType.PRE_TRANSLATE_BY_MT.chunkSize).toInt())
times(ceil(job.totalItems.toDouble() / 10).toInt())
).process(any(), any(), any(), any())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ abstract class AbstractCacheTest : AbstractSpringTest() {
keyName = "key-name",
sourceLanguageTag = "en",
targetLanguageTag = "de",
serviceType = MtServiceType.GOOGLE
serviceType = MtServiceType.GOOGLE,
isBatch = false
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BatchJobActionService(
private val savePointManager: SavePointManager
) : Logging {
companion object {
const val MIN_TIME_BETWEEN_OPERATIONS = 10
const val MIN_TIME_BETWEEN_OPERATIONS = 100
}

@EventListener(ApplicationReadyEvent::class)
Expand Down Expand Up @@ -90,15 +90,17 @@ class BatchJobActionService(
}
}
execution?.let { progressManager.handleChunkCompletedCommitted(it) }
addRetryExecutionToQueue(retryExecution)
addRetryExecutionToQueue(retryExecution, jobCharacter = executionItem.jobCharacter)
} catch (e: Throwable) {
progressManager.rollbackSetToRunning(executionItem.chunkExecutionId, executionItem.jobId)
when (e) {
is UnexpectedRollbackException -> {
logger.debug(
"Job ${executionItem.jobId}: ⚠️ Chunk ${executionItem.chunkExecutionId}" +
" thrown UnexpectedRollbackException"
)
}

else -> {
logger.error("Job ${executionItem.jobId}: ⚠️ Chunk ${executionItem.chunkExecutionId} thrown error", e)
Sentry.captureException(e)
Expand All @@ -112,18 +114,20 @@ class BatchJobActionService(
val lockedExecution = getExecutionIfCanAcquireLock(executionItem.chunkExecutionId)
if (lockedExecution == null) {
logger.debug("⚠️ Chunk ${executionItem.chunkExecutionId} is locked, skipping")
progressManager.rollbackSetToRunning(executionItem.chunkExecutionId, executionItem.jobId)
return null
}
if (lockedExecution.status != BatchJobChunkExecutionStatus.PENDING) {
logger.debug("⚠️ Chunk ${executionItem.chunkExecutionId} is not pending, skipping")
progressManager.rollbackSetToRunning(executionItem.chunkExecutionId, executionItem.jobId)
return null
}
return lockedExecution
}

private fun addRetryExecutionToQueue(retryExecution: BatchJobChunkExecution?) {
private fun addRetryExecutionToQueue(retryExecution: BatchJobChunkExecution?, jobCharacter: JobCharacter) {
retryExecution?.let {
batchJobChunkExecutionQueue.addToQueue(listOf(it))
batchJobChunkExecutionQueue.addToQueue(it, jobCharacter)
logger.debug("Job ${it.batchJob.id}: Added chunk ${it.id} for re-trial")
}
}
Expand Down Expand Up @@ -151,8 +155,8 @@ class BatchJobActionService(
entityManager.createNativeQuery("""SET enable_seqscan=off""")
return entityManager.createQuery(
"""
from BatchJobChunkExecution bjce
where bjce.id = :id
from BatchJobChunkExecution bjce
where bjce.id = :id
""".trimIndent(),
BatchJobChunkExecution::class.java
)
Expand All @@ -166,7 +170,7 @@ class BatchJobActionService(

fun cancelLocalJob(jobId: Long) {
batchJobChunkExecutionQueue.cancelJob(jobId)
concurrentExecutionLauncher.runningJobs.filter { it.value.first == jobId }.forEach {
concurrentExecutionLauncher.runningJobs.filter { it.value.first.id == jobId }.forEach {
it.value.second.cancel()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import io.tolgee.batch.events.OnBatchJobFailed
import io.tolgee.batch.events.OnBatchJobSucceeded
import io.tolgee.batch.state.BatchJobStateProvider
import io.tolgee.fixtures.waitFor
import io.tolgee.util.Logging
import io.tolgee.util.logger
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
import javax.persistence.EntityManager
Expand All @@ -15,7 +17,7 @@ class BatchJobActivityFinalizer(
private val entityManager: EntityManager,
private val activityHolder: ActivityHolder,
private val batchJobStateProvider: BatchJobStateProvider,
) {
) : Logging {
@EventListener(OnBatchJobSucceeded::class)
fun finalizeActivityWhenJobSucceeded(event: OnBatchJobSucceeded) {
finalizeActivityWhenJobCompleted(event.job)
Expand Down Expand Up @@ -45,27 +47,33 @@ class BatchJobActivityFinalizer(
mergeDescribingEntities(activityRevisionIdToMergeInto, revisionIds)
mergeModifiedEntities(activityRevisionIdToMergeInto, revisionIds)
deleteUnusedRevisions(revisionIds)
setJobIdToRevision(activityRevisionIdToMergeInto, job.id)
setJobIdAndAuthorIdToRevision(activityRevisionIdToMergeInto, job)
}
}

private fun waitForOtherChunksToComplete(job: BatchJobDto) {
waitFor(20000) {
val committedChunks = batchJobStateProvider.get(job.id).values
.count { !it.retry && it.transactionCommitted && it.status.completed }
.count { it.retry == false && it.transactionCommitted && it.status.completed }
logger.debug("Waitinng for completed chunks ($committedChunks) to be equal to all other chunks count (${job.totalChunks - 1})")
committedChunks == job.totalChunks - 1
}
}

private fun setJobIdToRevision(activityRevisionIdToMergeInto: Long, jobId: Long) {
private fun setJobIdAndAuthorIdToRevision(activityRevisionIdToMergeInto: Long, job: BatchJobDto) {
entityManager.createNativeQuery(
"""
update activity_revision set batch_job_chunk_execution_id = null, batch_job_id = :jobId
update activity_revision
set
batch_job_chunk_execution_id = null,
batch_job_id = :jobId,
author_id = :authorId
where id = :activityRevisionIdToMergeInto
"""
)
.setParameter("activityRevisionIdToMergeInto", activityRevisionIdToMergeInto)
.setParameter("jobId", jobId)
.setParameter("jobId", job.id)
.setParameter("authorId", job.authorId)
.executeUpdate()
}

Expand Down Expand Up @@ -126,12 +134,13 @@ class BatchJobActivityFinalizer(
group by entity_class, entity_id
having count(*) > 1)
and
activity_revision_id not in (select min(activity_revision_id)
from activity_describing_entity
where activity_revision_id in (:revisionIds)
or activity_revision_id = :activityRevisionIdToMergeInto
group by entity_class, entity_id
having count(*) > 1)
(activity_revision_id, entity_class, entity_id) not in (
select min(activity_revision_id), entity_class, entity_id
from activity_describing_entity
where activity_revision_id in (:revisionIds)
or activity_revision_id = :activityRevisionIdToMergeInto
group by entity_class, entity_id
having count(*) > 1)
""".trimIndent()
)
.setParameter("activityRevisionIdToMergeInto", activityRevisionIdToMergeInto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BatchJobCancellationManager(
}

fun cancelJob(jobId: Long) {
executeInNewTransaction(
val executions = executeInNewTransaction(
transactionManager = transactionManager,
isolationLevel = TransactionDefinition.ISOLATION_DEFAULT
) {
Expand All @@ -71,9 +71,13 @@ class BatchJobCancellationManager(
executions.forEach { execution ->
execution.status = BatchJobChunkExecutionStatus.CANCELLED
entityManager.persist(execution)
progressManager.handleProgress(execution)
}

executions.forEach { progressManager.handleProgress(it) }
executions
}
executions.forEach {
progressManager.handleChunkCompletedCommitted(it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class BatchJobChunkExecutionQueue(
"javax.persistence.lock.timeout",
LockOptions.SKIP_LOCKED
).resultList
logger.debug("Adding ${data.size} items to queue ${System.identityHashCode(this)}")
if (data.size > 0) {
logger.debug("Adding ${data.size} items to queue ${System.identityHashCode(this)}")
}
addExecutionsToLocalQueue(data)
}

Expand All @@ -76,25 +78,40 @@ class BatchJobChunkExecutionQueue(
}
}

fun addToQueue(execution: BatchJobChunkExecution, jobCharacter: JobCharacter) {
val item = execution.toItem(jobCharacter)
addItemsToQueue(listOf(item))
}

fun addToQueue(executions: List<BatchJobChunkExecution>) {
val items = executions.map { it.toItem() }
addItemsToQueue(items)
}

private fun addItemsToQueue(items: List<ExecutionQueueItem>) {
if (usingRedisProvider.areWeUsingRedis) {
val items = executions.map { it.toItem() }
val event = JobQueueItemsEvent(items, QueueEventType.ADD)
redisTemplate.convertAndSend(
RedisPubSubReceiverConfiguration.JOB_QUEUE_TOPIC,
jacksonObjectMapper().writeValueAsString(event)
)
return
}
this.addExecutionsToLocalQueue(executions)

this.addItemsToLocalQueue(items)
}

fun cancelJob(jobId: Long) {
queue.removeIf { it.jobId == jobId }
}

private fun BatchJobChunkExecution.toItem() =
ExecutionQueueItem(id, batchJob.id, executeAfter?.time)
private fun BatchJobChunkExecution.toItem(
// Yes. jobCharacter is part of the batchJob entity.
// However, we don't want to fetch it here, because it would be a waste of resources.
// So we can provide the jobCharacter here.
jobCharacter: JobCharacter? = null
) =
ExecutionQueueItem(id, batchJob.id, executeAfter?.time, jobCharacter ?: batchJob.jobCharacter)

val size get() = queue.size

Expand All @@ -117,4 +134,7 @@ class BatchJobChunkExecutionQueue(
fun contains(item: ExecutionQueueItem?): Boolean = queue.contains(item)

fun isEmpty(): Boolean = queue.isEmpty()
fun getJobCharacterCounts(): Map<JobCharacter, Int> {
return queue.groupBy { it.jobCharacter }.map { it.key to it.value.size }.toMap()
}
}
Loading
Loading