diff --git a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/AutoTranslationController.kt b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/AutoTranslationController.kt index 2df8652660..d8b929c146 100644 --- a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/AutoTranslationController.kt +++ b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/AutoTranslationController.kt @@ -75,7 +75,8 @@ When no languages provided, it translates only untranslated languages.""" key = key, languageTags = languages?.toList(), useTranslationMemory = useTranslationMemory ?: false, - useMachineTranslation = useMachineTranslation ?: false + useMachineTranslation = useMachineTranslation ?: false, + isBatch = true ) } diff --git a/backend/app/src/test/kotlin/io/tolgee/batch/AbstractBatchJobsGeneralTest.kt b/backend/app/src/test/kotlin/io/tolgee/batch/AbstractBatchJobsGeneralTest.kt index eb03a5de11..a247219d9d 100644 --- a/backend/app/src/test/kotlin/io/tolgee/batch/AbstractBatchJobsGeneralTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/batch/AbstractBatchJobsGeneralTest.kt @@ -116,7 +116,7 @@ abstract class AbstractBatchJobsGeneralTest : AbstractSpringTest() { waitForNotThrowing(pollTime = 1000) { verify( preTranslationByTmChunkProcessor, - times(ceil(job.totalItems.toDouble() / BatchJobType.PRE_TRANSLATE_BY_MT.chunkSize).toInt()) + times(ceil(job.totalItems.toDouble() / 10).toInt()) ).process(any(), any(), any(), any()) } diff --git a/backend/app/src/test/kotlin/io/tolgee/cache/AbstractCacheTest.kt b/backend/app/src/test/kotlin/io/tolgee/cache/AbstractCacheTest.kt index 0abb8ecb59..42757c6aae 100644 --- a/backend/app/src/test/kotlin/io/tolgee/cache/AbstractCacheTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/cache/AbstractCacheTest.kt @@ -57,7 +57,8 @@ abstract class AbstractCacheTest : AbstractSpringTest() { keyName = "key-name", sourceLanguageTag = "en", targetLanguageTag = "de", - serviceType = MtServiceType.GOOGLE + serviceType = MtServiceType.GOOGLE, + isBatch = false ) } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActionService.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActionService.kt index 822d883486..1e0235d2f8 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActionService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActionService.kt @@ -40,7 +40,7 @@ class BatchJobActionService( private val savePointManager: SavePointManager ) : Logging { companion object { - const val MIN_TIME_BETWEEN_OPERATIONS = 10 + const val MIN_TIME_BETWEEN_OPERATIONS = 100 } @EventListener(ApplicationReadyEvent::class) @@ -90,8 +90,9 @@ class BatchJobActionService( } } execution?.let { progressManager.handleChunkCompletedCommitted(it) } - addRetryExecutionToQueue(retryExecution) + addRetryExecutionToQueue(retryExecution, jobCharacter = executionItem.jobCharacter) } catch (e: Throwable) { + progressManager.rollbackSetToRunning(executionItem.chunkExecutionId, executionItem.jobId) when (e) { is UnexpectedRollbackException -> { logger.debug( @@ -99,6 +100,7 @@ class BatchJobActionService( " thrown UnexpectedRollbackException" ) } + else -> { logger.error("Job ${executionItem.jobId}: ⚠️ Chunk ${executionItem.chunkExecutionId} thrown error", e) Sentry.captureException(e) @@ -112,18 +114,20 @@ class BatchJobActionService( val lockedExecution = getExecutionIfCanAcquireLock(executionItem.chunkExecutionId) if (lockedExecution == null) { logger.debug("⚠️ Chunk ${executionItem.chunkExecutionId} is locked, skipping") + progressManager.rollbackSetToRunning(executionItem.chunkExecutionId, executionItem.jobId) return null } if (lockedExecution.status != BatchJobChunkExecutionStatus.PENDING) { logger.debug("⚠️ Chunk ${executionItem.chunkExecutionId} is not pending, skipping") + progressManager.rollbackSetToRunning(executionItem.chunkExecutionId, executionItem.jobId) return null } return lockedExecution } - private fun addRetryExecutionToQueue(retryExecution: BatchJobChunkExecution?) { + private fun addRetryExecutionToQueue(retryExecution: BatchJobChunkExecution?, jobCharacter: JobCharacter) { retryExecution?.let { - batchJobChunkExecutionQueue.addToQueue(listOf(it)) + batchJobChunkExecutionQueue.addToQueue(it, jobCharacter) logger.debug("Job ${it.batchJob.id}: Added chunk ${it.id} for re-trial") } } @@ -151,8 +155,8 @@ class BatchJobActionService( entityManager.createNativeQuery("""SET enable_seqscan=off""") return entityManager.createQuery( """ - from BatchJobChunkExecution bjce - where bjce.id = :id + from BatchJobChunkExecution bjce + where bjce.id = :id """.trimIndent(), BatchJobChunkExecution::class.java ) @@ -166,7 +170,7 @@ class BatchJobActionService( fun cancelLocalJob(jobId: Long) { batchJobChunkExecutionQueue.cancelJob(jobId) - concurrentExecutionLauncher.runningJobs.filter { it.value.first == jobId }.forEach { + concurrentExecutionLauncher.runningJobs.filter { it.value.first.id == jobId }.forEach { it.value.second.cancel() } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActivityFinalizer.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActivityFinalizer.kt index 465ab4e3a2..36ad613265 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActivityFinalizer.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobActivityFinalizer.kt @@ -6,6 +6,8 @@ import io.tolgee.batch.events.OnBatchJobFailed import io.tolgee.batch.events.OnBatchJobSucceeded import io.tolgee.batch.state.BatchJobStateProvider import io.tolgee.fixtures.waitFor +import io.tolgee.util.Logging +import io.tolgee.util.logger import org.springframework.context.event.EventListener import org.springframework.stereotype.Component import javax.persistence.EntityManager @@ -15,7 +17,7 @@ class BatchJobActivityFinalizer( private val entityManager: EntityManager, private val activityHolder: ActivityHolder, private val batchJobStateProvider: BatchJobStateProvider, -) { +) : Logging { @EventListener(OnBatchJobSucceeded::class) fun finalizeActivityWhenJobSucceeded(event: OnBatchJobSucceeded) { finalizeActivityWhenJobCompleted(event.job) @@ -45,27 +47,33 @@ class BatchJobActivityFinalizer( mergeDescribingEntities(activityRevisionIdToMergeInto, revisionIds) mergeModifiedEntities(activityRevisionIdToMergeInto, revisionIds) deleteUnusedRevisions(revisionIds) - setJobIdToRevision(activityRevisionIdToMergeInto, job.id) + setJobIdAndAuthorIdToRevision(activityRevisionIdToMergeInto, job) } } private fun waitForOtherChunksToComplete(job: BatchJobDto) { waitFor(20000) { val committedChunks = batchJobStateProvider.get(job.id).values - .count { !it.retry && it.transactionCommitted && it.status.completed } + .count { it.retry == false && it.transactionCommitted && it.status.completed } + logger.debug("Waitinng for completed chunks ($committedChunks) to be equal to all other chunks count (${job.totalChunks - 1})") committedChunks == job.totalChunks - 1 } } - private fun setJobIdToRevision(activityRevisionIdToMergeInto: Long, jobId: Long) { + private fun setJobIdAndAuthorIdToRevision(activityRevisionIdToMergeInto: Long, job: BatchJobDto) { entityManager.createNativeQuery( """ - update activity_revision set batch_job_chunk_execution_id = null, batch_job_id = :jobId + update activity_revision + set + batch_job_chunk_execution_id = null, + batch_job_id = :jobId, + author_id = :authorId where id = :activityRevisionIdToMergeInto """ ) .setParameter("activityRevisionIdToMergeInto", activityRevisionIdToMergeInto) - .setParameter("jobId", jobId) + .setParameter("jobId", job.id) + .setParameter("authorId", job.authorId) .executeUpdate() } @@ -126,12 +134,13 @@ class BatchJobActivityFinalizer( group by entity_class, entity_id having count(*) > 1) and - activity_revision_id not in (select min(activity_revision_id) - from activity_describing_entity - where activity_revision_id in (:revisionIds) - or activity_revision_id = :activityRevisionIdToMergeInto - group by entity_class, entity_id - having count(*) > 1) + (activity_revision_id, entity_class, entity_id) not in ( + select min(activity_revision_id), entity_class, entity_id + from activity_describing_entity + where activity_revision_id in (:revisionIds) + or activity_revision_id = :activityRevisionIdToMergeInto + group by entity_class, entity_id + having count(*) > 1) """.trimIndent() ) .setParameter("activityRevisionIdToMergeInto", activityRevisionIdToMergeInto) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt index 708bd4ca05..a56abcdb23 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt @@ -46,7 +46,7 @@ class BatchJobCancellationManager( } fun cancelJob(jobId: Long) { - executeInNewTransaction( + val executions = executeInNewTransaction( transactionManager = transactionManager, isolationLevel = TransactionDefinition.ISOLATION_DEFAULT ) { @@ -71,9 +71,13 @@ class BatchJobCancellationManager( executions.forEach { execution -> execution.status = BatchJobChunkExecutionStatus.CANCELLED entityManager.persist(execution) + progressManager.handleProgress(execution) } - executions.forEach { progressManager.handleProgress(it) } + executions + } + executions.forEach { + progressManager.handleChunkCompletedCommitted(it) } } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt index 3776d42520..b03fc8c7d3 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt @@ -55,7 +55,9 @@ class BatchJobChunkExecutionQueue( "javax.persistence.lock.timeout", LockOptions.SKIP_LOCKED ).resultList - logger.debug("Adding ${data.size} items to queue ${System.identityHashCode(this)}") + if (data.size > 0) { + logger.debug("Adding ${data.size} items to queue ${System.identityHashCode(this)}") + } addExecutionsToLocalQueue(data) } @@ -76,9 +78,18 @@ class BatchJobChunkExecutionQueue( } } + fun addToQueue(execution: BatchJobChunkExecution, jobCharacter: JobCharacter) { + val item = execution.toItem(jobCharacter) + addItemsToQueue(listOf(item)) + } + fun addToQueue(executions: List) { + val items = executions.map { it.toItem() } + addItemsToQueue(items) + } + + private fun addItemsToQueue(items: List) { if (usingRedisProvider.areWeUsingRedis) { - val items = executions.map { it.toItem() } val event = JobQueueItemsEvent(items, QueueEventType.ADD) redisTemplate.convertAndSend( RedisPubSubReceiverConfiguration.JOB_QUEUE_TOPIC, @@ -86,15 +97,21 @@ class BatchJobChunkExecutionQueue( ) return } - this.addExecutionsToLocalQueue(executions) + + this.addItemsToLocalQueue(items) } fun cancelJob(jobId: Long) { queue.removeIf { it.jobId == jobId } } - private fun BatchJobChunkExecution.toItem() = - ExecutionQueueItem(id, batchJob.id, executeAfter?.time) + private fun BatchJobChunkExecution.toItem( + // Yes. jobCharacter is part of the batchJob entity. + // However, we don't want to fetch it here, because it would be a waste of resources. + // So we can provide the jobCharacter here. + jobCharacter: JobCharacter? = null + ) = + ExecutionQueueItem(id, batchJob.id, executeAfter?.time, jobCharacter ?: batchJob.jobCharacter) val size get() = queue.size @@ -117,4 +134,7 @@ class BatchJobChunkExecutionQueue( fun contains(item: ExecutionQueueItem?): Boolean = queue.contains(item) fun isEmpty(): Boolean = queue.isEmpty() + fun getJobCharacterCounts(): Map { + return queue.groupBy { it.jobCharacter }.map { it.key to it.value.size }.toMap() + } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt index 3d213c768f..15f1399ee9 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt @@ -4,6 +4,7 @@ import io.sentry.Sentry import io.tolgee.component.CurrentDateProvider import io.tolgee.configuration.tolgee.BatchProperties import io.tolgee.fixtures.waitFor +import io.tolgee.model.batch.BatchJobChunkExecutionStatus import io.tolgee.util.Logging import io.tolgee.util.logger import kotlinx.coroutines.CoroutineScope @@ -16,19 +17,27 @@ import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import javax.annotation.PreDestroy import kotlin.coroutines.CoroutineContext +import kotlin.math.ceil @Component class BatchJobConcurrentLauncher( private val batchProperties: BatchProperties, private val batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue, private val currentDateProvider: CurrentDateProvider, + private val batchJobService: BatchJobService, + private val progressManager: ProgressManager, ) : Logging { companion object { val runningInstances: ConcurrentHashMap.KeySetView = ConcurrentHashMap.newKeySet() } - val runningJobs: ConcurrentHashMap> = ConcurrentHashMap() + /** + * execution id -> Pair(BatchJobDto, Job) + * + * Job is the result of launch method executing the execution in separate coroutine + */ + val runningJobs: ConcurrentHashMap> = ConcurrentHashMap() var pause = false set(value) { @@ -58,7 +67,7 @@ class BatchJobConcurrentLauncher( this.stop() } - fun repeatForever(fn: () -> Unit) { + fun repeatForever(fn: () -> Boolean) { runningInstances.forEach { it.stop() } runningInstances.add(this) @@ -66,8 +75,8 @@ class BatchJobConcurrentLauncher( while (run) { try { val startTime = System.currentTimeMillis() - fn() - val sleepTime = getSleepTime(startTime) + val somethingHandled = fn() + val sleepTime = getSleepTime(startTime, somethingHandled) if (sleepTime > 0) { Thread.sleep(sleepTime) } @@ -78,8 +87,8 @@ class BatchJobConcurrentLauncher( } } - private fun getSleepTime(startTime: Long): Long { - if (!batchJobChunkExecutionQueue.isEmpty() && jobsToLaunch > 0) { + private fun getSleepTime(startTime: Long, somethingHandled: Boolean): Long { + if (!batchJobChunkExecutionQueue.isEmpty() && jobsToLaunch > 0 && somethingHandled) { return 0 } return BatchJobActionService.MIN_TIME_BETWEEN_OPERATIONS - (System.currentTimeMillis() - startTime) @@ -90,12 +99,12 @@ class BatchJobConcurrentLauncher( masterRunJob = GlobalScope.launch(Dispatchers.IO) { repeatForever { if (pause) { - return@repeatForever + return@repeatForever false } val jobsToLaunch = jobsToLaunch if (jobsToLaunch <= 0) { - return@repeatForever + return@repeatForever false } logger.trace("Jobs to launch: $jobsToLaunch") @@ -104,20 +113,21 @@ class BatchJobConcurrentLauncher( logItemsPulled(items) - items.forEach { executionItem -> + // when something handled, return true + items.map { executionItem -> handleItem(executionItem, processExecution) - } + }.any() } } } private fun logItemsPulled(items: List) { if (items.isNotEmpty()) { - logger.debug( + logger.trace( "Pulled ${items.size} items from queue: " + items.joinToString(", ") { it.chunkExecutionId.toString() } ) - logger.debug( + logger.trace( "${batchJobChunkExecutionQueue.size} is left in the queue " + "(${System.identityHashCode(batchJobChunkExecutionQueue)}): " + batchJobChunkExecutionQueue.joinToString(", ") { it.chunkExecutionId.toString() } @@ -125,29 +135,57 @@ class BatchJobConcurrentLauncher( } } + /** + * Returns true if item was handled + */ private fun CoroutineScope.handleItem( executionItem: ExecutionQueueItem, processExecution: (executionItem: ExecutionQueueItem, coroutineContext: CoroutineContext) -> Unit - ) { + ): Boolean { + logger.trace("Trying to run execution ${executionItem.chunkExecutionId}") if (!executionItem.isTimeToExecute()) { - logger.debug( + logger.trace( """Execution ${executionItem.chunkExecutionId} not ready to execute, adding back to queue: | Difference ${executionItem.executeAfter!! - currentDateProvider.date.time}""".trimMargin() ) - batchJobChunkExecutionQueue.addItemsToLocalQueue(listOf(executionItem)) - return + addBackToQueue(executionItem) + return false + } + if (!canRunJobWithCharacter(executionItem.jobCharacter)) { + logger.trace( + """Execution ${executionItem.chunkExecutionId} cannot run concurrent job + |(there are already max coroutines working on this specific job)""".trimMargin() + ) + addBackToQueue(executionItem) + return false + } + + if (!executionItem.trySetRunningState()) { + logger.trace( + """Execution ${executionItem.chunkExecutionId} cannot run concurrent job + |(there are already max concurrent executions running of this specific job)""".trimMargin() + ) + addBackToQueue(executionItem) + return false } val job = launch { processExecution(executionItem, this.coroutineContext) } - runningJobs[executionItem.chunkExecutionId] = executionItem.jobId to job + val batchJobDto = batchJobService.getJobDto(executionItem.jobId) + runningJobs[executionItem.chunkExecutionId] = batchJobDto to job job.invokeOnCompletion { onJobCompleted(executionItem) } logger.debug("Execution ${executionItem.chunkExecutionId} launched. Running jobs: ${runningJobs.size}") + return true + } + + private fun addBackToQueue(executionItem: ExecutionQueueItem) { + logger.trace("Adding execution $executionItem back to queue") + batchJobChunkExecutionQueue.addItemsToLocalQueue(listOf(executionItem)) } private fun onJobCompleted(executionItem: ExecutionQueueItem) { @@ -162,4 +200,25 @@ class BatchJobConcurrentLauncher( val executeAfter = this.executeAfter ?: return true return executeAfter <= currentDateProvider.date.time } + + private fun canRunJobWithCharacter(character: JobCharacter): Boolean { + val queueCharacterCounts = batchJobChunkExecutionQueue.getJobCharacterCounts() + val otherCharactersInQueueCount = queueCharacterCounts.filter { it.key != character }.values.sum() + if (otherCharactersInQueueCount == 0) { + return true + } + val runningJobCharacterCounts = runningJobs.values.filter { it.first.jobCharacter == character }.size + val allowedCharacterCounts = ceil(character.maxConcurrencyRatio * batchProperties.concurrency) + return runningJobCharacterCounts < allowedCharacterCounts + } + + private fun ExecutionQueueItem.trySetRunningState(): Boolean { + return progressManager.trySetExecutionRunning(this.chunkExecutionId, this.jobId) { + val count = it.values.count { executionState -> executionState.status == BatchJobChunkExecutionStatus.RUNNING } + if (count == 0) { + return@trySetExecutionRunning true + } + batchJobService.getJobDto(this.jobId).maxPerJobConcurrency > count + } + } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobDto.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobDto.kt index d15c1de334..2308f2e2c2 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobDto.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobDto.kt @@ -8,13 +8,15 @@ class BatchJobDto( override var id: Long, val projectId: Long, val authorId: Long?, - val target: List, + val target: List, val totalItems: Int, val totalChunks: Int, val chunkSize: Int, override var status: BatchJobStatus, val type: BatchJobType, val params: Any?, + var maxPerJobConcurrency: Int, + var jobCharacter: JobCharacter, ) : IBatchJob { val chunkedTarget get() = BatchJob.chunkTarget(chunkSize, target) @@ -31,6 +33,8 @@ class BatchJobDto( status = entity.status, type = entity.type, params = entity.params, + maxPerJobConcurrency = entity.maxPerJobConcurrency, + jobCharacter = entity.jobCharacter, ) } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt index 1cf7ff287e..fd04152512 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt @@ -46,15 +46,15 @@ class BatchJobService( ) : Logging { @Transactional - fun startJob( - request: RequestType, + fun startJob( + request: Any, project: Project, author: UserAccount?, type: BatchJobType ): BatchJob { var executions: List? = null val job = executeInNewTransaction(transactionManager) { - val processor = getProcessor(type) + val processor = getProcessor(type) val target = processor.getTarget(request) val job = BatchJob().apply { @@ -62,16 +62,19 @@ class BatchJobService( this.author = author this.target = target this.totalItems = target.size - this.chunkSize = type.chunkSize + this.chunkSize = processor.getChunkSize(projectId = project.id, request = request) + this.jobCharacter = processor.getJobCharacter() + this.maxPerJobConcurrency = processor.getMaxPerJobConcurrency() this.type = type } + val chunked = job.chunkedTarget job.totalChunks = chunked.size cachingBatchJobService.saveJob(job) job.params = processor.getParams(request) - executions = chunked.mapIndexed { chunkNumber, _ -> + executions = List(chunked.size) { chunkNumber -> BatchJobChunkExecution().apply { batchJob = job this.chunkNumber = chunkNumber @@ -187,9 +190,8 @@ class BatchJobService( return BatchJobView(job, progress, errorMessage) } - @Suppress("USELESS_CAST") - fun getProcessor(type: BatchJobType): ChunkProcessor = - applicationContext.getBean(type.processor.java) as ChunkProcessor + fun getProcessor(type: BatchJobType): ChunkProcessor = + applicationContext.getBean(type.processor.java) as ChunkProcessor fun deleteAllByProjectId(projectId: Long) { val batchJobs = getAllByProjectId(projectId) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobType.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobType.kt index dcb1cfa7a3..b40b77c8e3 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobType.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobType.kt @@ -17,62 +17,52 @@ enum class BatchJobType( /** * 0 means no chunking */ - val chunkSize: Int, val maxRetries: Int, - val processor: KClass>, + val processor: KClass>, val defaultRetryWaitTimeInMs: Int = 2000, ) { PRE_TRANSLATE_BY_MT( activityType = ActivityType.BATCH_PRE_TRANSLATE_BY_MT, - chunkSize = 10, maxRetries = 3, processor = PreTranslationByTmChunkProcessor::class, ), MACHINE_TRANSLATE( activityType = ActivityType.BATCH_MACHINE_TRANSLATE, - chunkSize = 10, maxRetries = 3, processor = MachineTranslationChunkProcessor::class, ), DELETE_KEYS( activityType = ActivityType.KEY_DELETE, - chunkSize = 0, maxRetries = 3, processor = DeleteKeysChunkProcessor::class, ), SET_TRANSLATIONS_STATE( activityType = ActivityType.BATCH_SET_TRANSLATION_STATE, - chunkSize = 0, maxRetries = 3, processor = SetTranslationsStateChunkProcessor::class, ), CLEAR_TRANSLATIONS( activityType = ActivityType.BATCH_CLEAR_TRANSLATIONS, - chunkSize = 0, maxRetries = 3, processor = ClearTranslationsChunkProcessor::class, ), COPY_TRANSLATIONS( activityType = ActivityType.BATCH_COPY_TRANSLATIONS, - chunkSize = 0, maxRetries = 3, processor = CopyTranslationsChunkProcessor::class, ), TAG_KEYS( activityType = ActivityType.BATCH_TAG_KEYS, - chunkSize = 0, maxRetries = 3, processor = TagKeysChunkProcessor::class, ), UNTAG_KEYS( activityType = ActivityType.BATCH_UNTAG_KEYS, - chunkSize = 0, maxRetries = 3, processor = UntagKeysChunkProcessor::class, ), SET_KEYS_NAMESPACE( activityType = ActivityType.BATCH_SET_KEYS_NAMESPACE, - chunkSize = 0, maxRetries = 3, processor = SetKeysNamespaceChunkProcessor::class, ); diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchTranslationTargetItem.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchTranslationTargetItem.kt new file mode 100644 index 0000000000..49f591a6de --- /dev/null +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchTranslationTargetItem.kt @@ -0,0 +1,3 @@ +package io.tolgee.batch + +data class BatchTranslationTargetItem(val keyId: Long, val languageId: Long) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessingUtil.kt b/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessingUtil.kt index 01bf5f9b9b..a2380b849c 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessingUtil.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessingUtil.kt @@ -1,13 +1,17 @@ package io.tolgee.batch +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.sentry.Sentry import io.tolgee.activity.ActivityHolder import io.tolgee.component.CurrentDateProvider +import io.tolgee.component.machineTranslation.TranslationApiRateLimitException import io.tolgee.exceptions.ExceptionWithMessage +import io.tolgee.exceptions.OutOfCreditsException import io.tolgee.model.batch.BatchJobChunkExecution import io.tolgee.model.batch.BatchJobChunkExecutionStatus import io.tolgee.util.Logging import io.tolgee.util.logger +import org.apache.commons.lang3.exception.ExceptionUtils import org.hibernate.LockOptions import org.springframework.context.ApplicationContext import java.util.* @@ -24,7 +28,6 @@ open class ChunkProcessingUtil( open fun processChunk() { val time = measureTimeMillis { try { - val processor = batchJobService.getProcessor(job.type) processor.process(job, toProcess, coroutineContext) { if (it != toProcess.size) { progressManager.publishSingleChunkProgress(job.id, it) @@ -50,6 +53,7 @@ open class ChunkProcessingUtil( val batchJobDto = batchJobService.getJobDto(job.id) activityRevision.projectId = batchJobDto.projectId activityHolder.activity = batchJobDto.type.activityType + activityRevision.authorId = batchJobDto.authorId } private fun handleException(exception: Throwable) { @@ -58,12 +62,12 @@ open class ChunkProcessingUtil( return } - execution.exception = exception.stackTraceToString() + execution.stackTrace = exception.stackTraceToString() execution.status = BatchJobChunkExecutionStatus.FAILED execution.errorMessage = (exception as? ExceptionWithMessage)?.tolgeeMessage + execution.errorKey = ExceptionUtils.getRootCause(exception)?.javaClass?.simpleName - Sentry.captureException(exception) - logger.error(exception.message, exception) + logException(exception) if (exception is ChunkFailedException) { successfulTargets = exception.successfulTargets @@ -77,6 +81,18 @@ open class ChunkProcessingUtil( retryFailedExecution(exception) } + private fun logException(exception: Throwable) { + val knownCauses = listOf( + OutOfCreditsException::class.java, TranslationApiRateLimitException::class.java + ) + + val isKnownCause = knownCauses.any { ExceptionUtils.indexOfType(exception, it) > -1 } + if (!isKnownCause) { + Sentry.captureException(exception) + logger.error(exception.message, exception) + } + } + private fun retryFailedExecution(exception: Throwable) { var maxRetries = job.type.maxRetries var waitTime = job.type.defaultRetryWaitTimeInMs @@ -86,19 +102,19 @@ open class ChunkProcessingUtil( waitTime = getWaitTime(exception) } - if (retries >= maxRetries) { - logger.debug("Max retries reached for job execution $execution") + if (errorKeyRetries >= maxRetries && maxRetries != -1) { + logger.debug("Max retries reached for job execution ${execution.id}") Sentry.captureException(exception) return } - logger.debug("Retrying job execution $execution in ${waitTime}ms") + logger.debug("Retrying job execution ${execution.id} in ${waitTime}ms") retryExecution.executeAfter = Date(waitTime + currentDateProvider.date.time) execution.retry = true } private fun getWaitTime(exception: RequeueWithDelayException) = - exception.delayInMs * (exception.increaseFactor.toDouble().pow(retries.toDouble())).toInt() + exception.delayInMs * (exception.increaseFactor.toDouble().pow(errorKeyRetries.toDouble())).toInt() private val job by lazy { batchJobService.getJobDto(execution.batchJob.id) } @@ -122,17 +138,44 @@ open class ChunkProcessingUtil( applicationContext.getBean(ProgressManager::class.java) } - private var successfulTargets: List? = null + private val processor by lazy { + batchJobService.getProcessor(job.type) + } + + private var successfulTargets: List? = null private val toProcess by lazy { - val chunked = job.chunkedTarget - val chunk = chunked[execution.chunkNumber] - val previousSuccessfulTargets = previousExecutions.flatMap { it.successTargets }.toSet() val toProcess = chunk.toMutableSet() toProcess.removeAll(previousSuccessfulTargets) toProcess.toList() } + private val previousSuccessfulTargets by lazy { + previousExecutions.flatMap { + // this is important!! + // we want the equals check to be run on the correct type with correct class instances + convertChunkToItsType(it.successTargets) + }.toSet() + } + + /** + * We need to convert the chunk to the right type, so we pass it to the processor correctly + * + * e.g. It can happen that the chunk is converted to a list of integers for caching, but + * we actually need a list of Long + */ + private val chunk by lazy { + val chunked = job.chunkedTarget + val chunk = chunked[execution.chunkNumber] + convertChunkToItsType(chunk) + } + + private fun convertChunkToItsType(chunk: List): List { + val type = + jacksonObjectMapper().typeFactory.constructCollectionType(List::class.java, processor.getTargetItemType()) + return jacksonObjectMapper().convertValue(chunk, type) as List + } + val retryExecution: BatchJobChunkExecution by lazy { BatchJobChunkExecution().apply { batchJob = entityManager.getReference(execution.batchJob::class.java, job.id) @@ -141,8 +184,13 @@ open class ChunkProcessingUtil( } } - private val retries: Int by lazy { - previousExecutions.size + private val errorKeyRetries by lazy { + val errorKey = execution.errorKey ?: throw IllegalStateException("Error key is not set") + retries[errorKey] ?: 0 + } + + private val retries: Map by lazy { + previousExecutions.groupBy { it.errorKey }.map { it.key to it.value.size.toLong() }.toMap() } @Suppress("UNCHECKED_CAST") diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessor.kt index e9f69fd5aa..0d48103514 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/ChunkProcessor.kt @@ -3,14 +3,34 @@ package io.tolgee.batch import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import kotlin.coroutines.CoroutineContext -interface ChunkProcessor { - fun process(job: BatchJobDto, chunk: List, coroutineContext: CoroutineContext, onProgress: ((Int) -> Unit)) - fun getTarget(data: RequestType): List +interface ChunkProcessor { + fun process( + job: BatchJobDto, + chunk: List, + coroutineContext: CoroutineContext, + onProgress: ((Int) -> Unit) + ) + + fun getTarget(data: RequestType): List fun getParams(data: RequestType): ParamsType fun getParams(job: BatchJobDto): ParamsType { return jacksonObjectMapper().convertValue(job.params, getParamsType()) } + fun getMaxPerJobConcurrency(): Int { + return -1 + } + + fun getJobCharacter(): JobCharacter { + return JobCharacter.FAST + } + + fun getChunkSize(request: RequestType, projectId: Long): Int { + return 0 + } + fun getParamsType(): Class? + + fun getTargetItemType(): Class } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/ExecutionQueueItem.kt b/backend/data/src/main/kotlin/io/tolgee/batch/ExecutionQueueItem.kt index 628c6d6ab8..f7e23383d7 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/ExecutionQueueItem.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/ExecutionQueueItem.kt @@ -3,5 +3,6 @@ package io.tolgee.batch data class ExecutionQueueItem( val chunkExecutionId: Long, val jobId: Long, - val executeAfter: Long? + val executeAfter: Long?, + val jobCharacter: JobCharacter ) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/JobCharacter.kt b/backend/data/src/main/kotlin/io/tolgee/batch/JobCharacter.kt new file mode 100644 index 0000000000..f6cbe32cb7 --- /dev/null +++ b/backend/data/src/main/kotlin/io/tolgee/batch/JobCharacter.kt @@ -0,0 +1,12 @@ +package io.tolgee.batch + +enum class JobCharacter( + /** + * How many threads can be used for jobs with this character + * When other jobs witch other characters are in queue + */ + val maxConcurrencyRatio: Double, +) { + SLOW(0.2), + FAST(0.8) +} diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/ProgressManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/ProgressManager.kt index 517df09d8d..ed6aa92d3f 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/ProgressManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/ProgressManager.kt @@ -33,6 +33,52 @@ class ProgressManager( private val cachingBatchJobService: CachingBatchJobService ) : Logging { + /** + * This method tries to set execution running in the state + * @param canRunFn function that returns true if execution can be run + */ + fun trySetExecutionRunning( + executionId: Long, + batchJobId: Long, + canRunFn: (Map) -> Boolean + ): Boolean { + return batchJobStateProvider.updateState(batchJobId) { + if (it[executionId] != null) { + // we expect the item wasn't touched by others + // if it was, there are other mechanisms to handle it, + // so we just ignore it + return@updateState true + } + if (canRunFn(it)) { + it[executionId] = + ExecutionState( + successTargets = listOf(), + status = BatchJobChunkExecutionStatus.RUNNING, + chunkNumber = null, + retry = null, + transactionCommitted = false + ) + return@updateState true + } + return@updateState false + } + } + + /** + * This method is called when the execution is not even started, because it was locked or something, + * it doesn't set it when the status is different from RUNNING + */ + fun rollbackSetToRunning( + executionId: Long, + batchJobId: Long, + ) { + return batchJobStateProvider.updateState(batchJobId) { + if (it[executionId]?.status == BatchJobChunkExecutionStatus.RUNNING) { + it.remove(executionId) + } + } + } + fun handleProgress(execution: BatchJobChunkExecution) { val job = batchJobService.getJobDto(execution.batchJob.id) @@ -57,7 +103,8 @@ class ProgressManager( job, progress = info.progress, isAnyCancelled = info.isAnyCancelled, - completedChunks = info.completedChunks + completedChunks = info.completedChunks, + errorMessage = execution.errorMessage ) } @@ -119,7 +166,7 @@ class ProgressManager( var completedChunks = 0L var progress = 0L this.values.forEach { - if (it.status.completed && !it.retry) completedChunks++ + if (it.status.completed && it.retry == false) completedChunks++ progress += it.successTargets.size } val isAnyCancelled = this.values.any { it.status == BatchJobChunkExecutionStatus.CANCELLED } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/exceptions.kt b/backend/data/src/main/kotlin/io/tolgee/batch/exceptions.kt index f2a4242538..7f962f42da 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/exceptions.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/exceptions.kt @@ -5,20 +5,20 @@ import io.tolgee.exceptions.ExceptionWithMessage open class ChunkFailedException( message: Message, - val successfulTargets: List, + val successfulTargets: List, override val cause: Throwable ) : ExceptionWithMessage(message) open class FailedDontRequeueException( message: Message, - successfulTargets: List, + successfulTargets: List, cause: Throwable ) : ChunkFailedException(message, successfulTargets, cause) open class RequeueWithDelayException( message: Message, - successfulTargets: List, + successfulTargets: List, cause: Throwable, val delayInMs: Int = 100, val increaseFactor: Int = 10, diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/ClearTranslationsChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/ClearTranslationsChunkProcessor.kt index 3695864691..24873d4783 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/ClearTranslationsChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/ClearTranslationsChunkProcessor.kt @@ -14,7 +14,7 @@ import kotlin.coroutines.CoroutineContext class ClearTranslationsChunkProcessor( private val translationService: TranslationService, private val entityManager: EntityManager -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, @@ -26,6 +26,7 @@ class ClearTranslationsChunkProcessor( val params = getParams(job) subChunked.forEach { subChunk -> coroutineContext.ensureActive() + @Suppress("UNCHECKED_CAST") translationService.clear(subChunk, params.languageIds) entityManager.flush() progress += subChunk.size @@ -37,6 +38,10 @@ class ClearTranslationsChunkProcessor( return ClearTranslationsJobParams::class.java } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getTarget(data: ClearTranslationsRequest): List { return data.keyIds } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/CopyTranslationsChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/CopyTranslationsChunkProcessor.kt index 35b07e97a6..79341b40d1 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/CopyTranslationsChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/CopyTranslationsChunkProcessor.kt @@ -14,7 +14,7 @@ import kotlin.coroutines.CoroutineContext class CopyTranslationsChunkProcessor( private val translationService: TranslationService, private val entityManager: EntityManager -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, @@ -41,6 +41,10 @@ class CopyTranslationsChunkProcessor( return data.keyIds } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getParams(data: CopyTranslationRequest): CopyTranslationJobParams { return CopyTranslationJobParams().apply { sourceLanguageId = data.sourceLanguageId diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/DeleteKeysChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/DeleteKeysChunkProcessor.kt index 09f520eba9..f05f5a1fda 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/DeleteKeysChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/DeleteKeysChunkProcessor.kt @@ -13,7 +13,7 @@ import kotlin.coroutines.CoroutineContext class DeleteKeysChunkProcessor( private val keyService: KeyService, private val entityManager: EntityManager -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, @@ -25,7 +25,8 @@ class DeleteKeysChunkProcessor( var progress: Int = 0 subChunked.forEach { subChunk -> coroutineContext.ensureActive() - keyService.deleteMultiple(subChunk) + @Suppress("UNCHECKED_CAST") + keyService.deleteMultiple(subChunk as List) entityManager.flush() progress += subChunk.size onProgress.invoke(progress) @@ -40,6 +41,10 @@ class DeleteKeysChunkProcessor( return null } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getTarget(data: DeleteKeysRequest): List { return data.keyIds } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/GenericAutoTranslationChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/GenericAutoTranslationChunkProcessor.kt index 35141e7be9..9d9e768245 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/GenericAutoTranslationChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/GenericAutoTranslationChunkProcessor.kt @@ -1,11 +1,14 @@ package io.tolgee.batch.processors import io.tolgee.batch.BatchJobDto +import io.tolgee.batch.BatchTranslationTargetItem import io.tolgee.batch.FailedDontRequeueException import io.tolgee.batch.RequeueWithDelayException +import io.tolgee.component.CurrentDateProvider +import io.tolgee.component.machineTranslation.TranslationApiRateLimitException import io.tolgee.constants.Message import io.tolgee.exceptions.OutOfCreditsException -import io.tolgee.model.Language +import io.tolgee.service.LanguageService import io.tolgee.service.key.KeyService import io.tolgee.service.translation.AutoTranslationService import kotlinx.coroutines.ensureActive @@ -16,29 +19,44 @@ import kotlin.coroutines.CoroutineContext class GenericAutoTranslationChunkProcessor( private val autoTranslationService: AutoTranslationService, private val keyService: KeyService, + private val currentDateProvider: CurrentDateProvider, + private val languageService: LanguageService ) { fun process( job: BatchJobDto, - chunk: List, + chunk: List, coroutineContext: CoroutineContext, onProgress: (Int) -> Unit, type: Type, - languages: List ) { - val keys = keyService.find(chunk) - val successfulTargets = mutableListOf() - keys.forEach { key -> + val languages = languageService.findByIdIn(chunk.map { it.languageId }.toSet()).associateBy { it.id } + val keys = keyService.find(chunk.map { it.keyId }).associateBy { it.id } + val successfulTargets = mutableListOf() + chunk.forEach { item -> + val (keyId, languageId) = item coroutineContext.ensureActive() try { + val languageTag = languages[languageId]?.tag ?: return@forEach + val key = keys[keyId] ?: return@forEach autoTranslationService.autoTranslate( key = key, - languageTags = languages.map { it.tag }, + languageTags = listOf(languageTag), useTranslationMemory = type == Type.PRE_TRANSLATION_BY_TM, useMachineTranslation = type == Type.MACHINE_TRANSLATION, + isBatch = true ) - successfulTargets.add(key.id) + successfulTargets.add(item) } catch (e: OutOfCreditsException) { throw FailedDontRequeueException(Message.OUT_OF_CREDITS, successfulTargets, e) + } catch (e: TranslationApiRateLimitException) { + throw RequeueWithDelayException( + Message.TRANSLATION_API_RATE_LIMIT, + successfulTargets, + e, + (e.retryAt - currentDateProvider.date.time).toInt(), + increaseFactor = 1, + maxRetries = -1 + ) } catch (e: Throwable) { throw RequeueWithDelayException(Message.TRANSLATION_FAILED, successfulTargets, e) } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/MachineTranslationChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/MachineTranslationChunkProcessor.kt index 43a90a10e5..8a53cc3b66 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/MachineTranslationChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/MachineTranslationChunkProcessor.kt @@ -1,34 +1,37 @@ package io.tolgee.batch.processors import io.tolgee.batch.BatchJobDto +import io.tolgee.batch.BatchTranslationTargetItem import io.tolgee.batch.ChunkProcessor +import io.tolgee.batch.JobCharacter import io.tolgee.batch.request.MachineTranslationRequest +import io.tolgee.constants.MtServiceType +import io.tolgee.model.Project import io.tolgee.model.batch.params.MachineTranslationJobParams -import io.tolgee.service.LanguageService +import io.tolgee.service.machineTranslation.MtServiceConfigService import org.springframework.stereotype.Component +import javax.persistence.EntityManager import kotlin.coroutines.CoroutineContext @Component class MachineTranslationChunkProcessor( - private val languageService: LanguageService, - private val genericAutoTranslationChunkProcessor: GenericAutoTranslationChunkProcessor -) : ChunkProcessor { + private val genericAutoTranslationChunkProcessor: GenericAutoTranslationChunkProcessor, + private val mtServiceConfigService: MtServiceConfigService, + private val entityManager: EntityManager +) : ChunkProcessor { override fun process( job: BatchJobDto, - chunk: List, + chunk: List, coroutineContext: CoroutineContext, onProgress: (Int) -> Unit ) { - val parameters = getParams(job) - val languages = languageService.findByIdIn(parameters.targetLanguageIds) - + @Suppress("UNCHECKED_CAST") genericAutoTranslationChunkProcessor.process( job, chunk, coroutineContext, onProgress, GenericAutoTranslationChunkProcessor.Type.MACHINE_TRANSLATION, - languages ) } @@ -36,8 +39,34 @@ class MachineTranslationChunkProcessor( return MachineTranslationJobParams::class.java } - override fun getTarget(data: MachineTranslationRequest): List { - return data.keyIds + override fun getTarget(data: MachineTranslationRequest): List { + return data.keyIds.flatMap { keyId -> + data.targetLanguageIds.map { languageId -> + BatchTranslationTargetItem(keyId, languageId) + } + } + } + + override fun getMaxPerJobConcurrency(): Int { + return 1 + } + + override fun getJobCharacter(): JobCharacter { + return JobCharacter.SLOW + } + + override fun getChunkSize(request: MachineTranslationRequest, projectId: Long): Int { + val languageIds = request.targetLanguageIds + val project = entityManager.getReference(Project::class.java, projectId) + val services = mtServiceConfigService.getPrimaryServices(languageIds, project).values.toSet() + if (services.contains(MtServiceType.TOLGEE)) { + return 2 + } + return 5 + } + + override fun getTargetItemType(): Class { + return BatchTranslationTargetItem::class.java } override fun getParams(data: MachineTranslationRequest): MachineTranslationJobParams { diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/PreTranslationByTmChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/PreTranslationByTmChunkProcessor.kt index b8e7753303..91153ebb66 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/PreTranslationByTmChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/PreTranslationByTmChunkProcessor.kt @@ -1,6 +1,7 @@ package io.tolgee.batch.processors import io.tolgee.batch.BatchJobDto +import io.tolgee.batch.BatchTranslationTargetItem import io.tolgee.batch.ChunkProcessor import io.tolgee.batch.request.PreTranslationByTmRequest import io.tolgee.model.batch.params.PreTranslationByTmJobParams @@ -12,7 +13,7 @@ import kotlin.coroutines.CoroutineContext class PreTranslationByTmChunkProcessor( private val languageService: LanguageService, private val genericAutoTranslationChunkProcessor: GenericAutoTranslationChunkProcessor -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, @@ -22,16 +23,25 @@ class PreTranslationByTmChunkProcessor( val parameters = getParams(job) val languages = languageService.findByIdIn(parameters.targetLanguageIds) + val preparedChunk = chunk.map { keyId -> + languages.map { language -> + BatchTranslationTargetItem(keyId as Long, language.id) + } + }.flatten() + genericAutoTranslationChunkProcessor.process( job, - chunk, + preparedChunk, coroutineContext, onProgress, GenericAutoTranslationChunkProcessor.Type.PRE_TRANSLATION_BY_TM, - languages ) } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getTarget(data: PreTranslationByTmRequest): List { return data.keyIds } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetKeysNamespaceChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetKeysNamespaceChunkProcessor.kt index cc28e969d4..e496743a75 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetKeysNamespaceChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetKeysNamespaceChunkProcessor.kt @@ -18,12 +18,12 @@ import kotlin.coroutines.CoroutineContext class SetKeysNamespaceChunkProcessor( private val entityManager: EntityManager, private val keyService: KeyService -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, coroutineContext: CoroutineContext, - onProgress: ((Int) -> Unit) + onProgress: (Int) -> Unit ) { val subChunked = chunk.chunked(100) var progress = 0 @@ -44,6 +44,10 @@ class SetKeysNamespaceChunkProcessor( } } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getTarget(data: SetKeysNamespaceRequest): List { return data.keyIds } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetTranslationsStateChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetTranslationsStateChunkProcessor.kt index 2239ac051b..8fad8a4348 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetTranslationsStateChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/SetTranslationsStateChunkProcessor.kt @@ -14,12 +14,12 @@ import kotlin.coroutines.CoroutineContext class SetTranslationsStateChunkProcessor( private val translationService: TranslationService, private val entityManager: EntityManager -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, coroutineContext: CoroutineContext, - onProgress: ((Int) -> Unit) + onProgress: (Int) -> Unit ) { val subChunked = chunk.chunked(1000) var progress: Int = 0 @@ -41,6 +41,10 @@ class SetTranslationsStateChunkProcessor( return SetTranslationStateJobParams::class.java } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getParams(data: SetTranslationsStateStateRequest): SetTranslationStateJobParams { return SetTranslationStateJobParams().apply { languageIds = data.languageIds diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/TagKeysChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/TagKeysChunkProcessor.kt index 79c8856bc9..54500dfe8b 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/TagKeysChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/TagKeysChunkProcessor.kt @@ -14,14 +14,14 @@ import kotlin.coroutines.CoroutineContext class TagKeysChunkProcessor( private val entityManager: EntityManager, private val tagService: TagService -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, coroutineContext: CoroutineContext, - onProgress: ((Int) -> Unit) + onProgress: (Int) -> Unit ) { - val subChunked = chunk.chunked(100) + val subChunked = chunk.chunked(100) as List> var progress: Int = 0 var params = getParams(job) subChunked.forEach { subChunk -> @@ -41,6 +41,10 @@ class TagKeysChunkProcessor( return TagKeysParams::class.java } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getParams(data: TagKeysRequest): TagKeysParams { return TagKeysParams().apply { this.tags = data.tags diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/processors/UntagKeysChunkProcessor.kt b/backend/data/src/main/kotlin/io/tolgee/batch/processors/UntagKeysChunkProcessor.kt index 201198711a..260a246754 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/processors/UntagKeysChunkProcessor.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/processors/UntagKeysChunkProcessor.kt @@ -14,14 +14,15 @@ import kotlin.coroutines.CoroutineContext class UntagKeysChunkProcessor( private val entityManager: EntityManager, private val tagService: TagService -) : ChunkProcessor { +) : ChunkProcessor { override fun process( job: BatchJobDto, chunk: List, coroutineContext: CoroutineContext, - onProgress: ((Int) -> Unit) + onProgress: (Int) -> Unit ) { - val subChunked = chunk.chunked(100) + @Suppress("UNCHECKED_CAST") + val subChunked = chunk.chunked(100) as List> var progress = 0 val params = getParams(job) subChunked.forEach { subChunk -> @@ -41,6 +42,10 @@ class UntagKeysChunkProcessor( return UntagKeysParams::class.java } + override fun getTargetItemType(): Class { + return Long::class.java + } + override fun getParams(data: UntagKeysRequest): UntagKeysParams { return UntagKeysParams().apply { this.tags = data.tags diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/request/SetKeysNamespaceRequest.kt b/backend/data/src/main/kotlin/io/tolgee/batch/request/SetKeysNamespaceRequest.kt index 92ab20108e..535fef4fa2 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/request/SetKeysNamespaceRequest.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/request/SetKeysNamespaceRequest.kt @@ -1,7 +1,6 @@ package io.tolgee.batch.request import io.tolgee.constants.ValidationConstants -import javax.validation.constraints.NotBlank import javax.validation.constraints.NotEmpty import javax.validation.constraints.Size @@ -9,8 +8,6 @@ class SetKeysNamespaceRequest { @NotEmpty var keyIds: List = listOf() - @NotEmpty - @NotBlank @Size(max = ValidationConstants.MAX_NAMESPACE_LENGTH) - var namespace: String = "" + var namespace: String? = null } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/state/ExecutionState.kt b/backend/data/src/main/kotlin/io/tolgee/batch/state/ExecutionState.kt index 75a106a309..9adaec2aa5 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/state/ExecutionState.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/state/ExecutionState.kt @@ -3,9 +3,9 @@ package io.tolgee.batch.state import io.tolgee.model.batch.BatchJobChunkExecutionStatus data class ExecutionState( - var successTargets: List, + var successTargets: List, var status: BatchJobChunkExecutionStatus, - var chunkNumber: Int, - var retry: Boolean, + var chunkNumber: Int?, + var retry: Boolean?, var transactionCommitted: Boolean, ) diff --git a/backend/data/src/main/kotlin/io/tolgee/component/AutoTranslationListener.kt b/backend/data/src/main/kotlin/io/tolgee/component/AutoTranslationListener.kt index d1cfb24d29..e695120d0b 100644 --- a/backend/data/src/main/kotlin/io/tolgee/component/AutoTranslationListener.kt +++ b/backend/data/src/main/kotlin/io/tolgee/component/AutoTranslationListener.kt @@ -24,6 +24,7 @@ class AutoTranslationListener( if (wasUntranslatedBefore && isTranslatedAfter) { autoTranslationService.autoTranslate( key = event.key, + isBatch = true, ) } } catch (e: OutOfCreditsException) { diff --git a/backend/data/src/main/kotlin/io/tolgee/component/bucket/NotEnoughTokensException.kt b/backend/data/src/main/kotlin/io/tolgee/component/bucket/NotEnoughTokensException.kt new file mode 100644 index 0000000000..5f79a90fc4 --- /dev/null +++ b/backend/data/src/main/kotlin/io/tolgee/component/bucket/NotEnoughTokensException.kt @@ -0,0 +1,3 @@ +package io.tolgee.component.bucket + +class NotEnoughTokensException(val refillAt: Long) : RuntimeException("Not enough credits in the bucket") diff --git a/backend/data/src/main/kotlin/io/tolgee/component/bucket/TokenBucket.kt b/backend/data/src/main/kotlin/io/tolgee/component/bucket/TokenBucket.kt new file mode 100644 index 0000000000..aa50a0543a --- /dev/null +++ b/backend/data/src/main/kotlin/io/tolgee/component/bucket/TokenBucket.kt @@ -0,0 +1,18 @@ +package io.tolgee.component.bucket + +import java.time.Duration + +data class TokenBucket( + var refillAt: Long, + var size: Long, + var tokens: Long, + val period: Duration +) { + fun refillIfItsTime(currentTimestamp: Long, newTokens: Long): TokenBucket { + if (refillAt < currentTimestamp) { + this.tokens = newTokens + this.refillAt = currentTimestamp + period.toMillis() + } + return this + } +} diff --git a/backend/data/src/main/kotlin/io/tolgee/component/bucket/TokenBucketManager.kt b/backend/data/src/main/kotlin/io/tolgee/component/bucket/TokenBucketManager.kt new file mode 100644 index 0000000000..38f388657b --- /dev/null +++ b/backend/data/src/main/kotlin/io/tolgee/component/bucket/TokenBucketManager.kt @@ -0,0 +1,107 @@ +package io.tolgee.component.bucket + +import io.tolgee.component.CurrentDateProvider +import io.tolgee.component.LockingProvider +import io.tolgee.component.UsingRedisProvider +import io.tolgee.util.Logging +import io.tolgee.util.logger +import org.redisson.api.RedissonClient +import org.springframework.context.annotation.Lazy +import org.springframework.stereotype.Component +import java.time.Duration +import java.util.concurrent.ConcurrentHashMap + +@Component +class TokenBucketManager( + val usingRedisProvider: UsingRedisProvider, + val currentDateProvider: CurrentDateProvider, + val lockingProvider: LockingProvider, + @Lazy + var redissonClient: RedissonClient +) : Logging { + companion object { + val localTokenBucketStorage = ConcurrentHashMap() + } + + fun consume(bucketId: String, tokensToConsume: Long, bucketSize: Long, renewPeriod: Duration) { + updateBucket(bucketId) { + consumeMappingFn( + tokenBucket = it, + tokensToConsume = tokensToConsume, + bucketSize = bucketSize, + renewPeriod = renewPeriod + ) + } + } + + fun addTokens(bucketId: String, tokensToAdd: Long) { + updateTokens(bucketId) { + it + tokensToAdd + } + } + + fun updateTokens(bucketId: String, updateFn: ((oldTokens: Long) -> Long)) { + updateBucket(bucketId) { + updateMappingFn(it, updateFn) + } + } + + private fun getLockingId(bucketId: String) = "lock_bucket_$bucketId" + + private fun consumeMappingFn( + tokenBucket: TokenBucket?, + tokensToConsume: Long, + bucketSize: Long, + renewPeriod: Duration + ): TokenBucket { + val currentTokenBucket = + getCurrentOrNewBucket(tokenBucket, bucketSize, renewPeriod) + currentTokenBucket.refillIfItsTime(currentDateProvider.date.time, bucketSize) + if (currentTokenBucket.tokens < tokensToConsume) { + throw NotEnoughTokensException(currentTokenBucket.refillAt) + } + return currentTokenBucket.copy(tokens = currentTokenBucket.tokens - tokensToConsume) + } + + private fun setEmptyUntilMappingFn( + tokenBucket: TokenBucket?, + emptyUntil: Long, + ): TokenBucket? { + tokenBucket ?: return null + return tokenBucket.copy(tokens = 0, refillAt = emptyUntil) + } + + private fun getCurrentOrNewBucket( + tokenBucket: TokenBucket?, + bucketSize: Long, + renewPeriod: Duration + ) = tokenBucket ?: TokenBucket(currentDateProvider.date.time, bucketSize, bucketSize, renewPeriod) + + private fun updateMappingFn( + tokenBucket: TokenBucket?, + updateFn: ((oldTokens: Long) -> Long) + ): TokenBucket? { + tokenBucket ?: return null + val newTokens = updateFn(tokenBucket.tokens) + return tokenBucket.copy(tokens = newTokens) + } + + fun setEmptyUntil(bucketId: String, refillAt: Long) { + logger.debug("Setting bucket $bucketId empty for next ${Duration.ofMillis(refillAt - currentDateProvider.date.time).seconds} seconds") + updateBucket(bucketId) { setEmptyUntilMappingFn(it, refillAt) } + } + + fun updateBucket(bucketId: String, mappingFn: (bucket: TokenBucket?) -> TokenBucket?): TokenBucket? { + if (!usingRedisProvider.areWeUsingRedis) { + return localTokenBucketStorage.compute(bucketId) { _, bucket -> + mappingFn(bucket) + } + } + return lockingProvider.withLocking(getLockingId(bucketId)) { + val redissonBucket = redissonClient.getBucket(bucketId) + val newBucket = mappingFn(redissonBucket.get()) + redissonBucket.set(newBucket) + return@withLocking newBucket + } + } +} diff --git a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/MtServiceManager.kt b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/MtServiceManager.kt index d95e023acc..465ba01589 100644 --- a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/MtServiceManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/MtServiceManager.kt @@ -42,11 +42,23 @@ class MtServiceManager( sourceLanguageTag: String, targetLanguageTag: String, services: Collection, - metadata: Metadata? + metadata: Metadata?, + isBatch: Boolean ): Map { return runBlocking(Dispatchers.IO) { services.map { service -> - async { service to translate(text, textRaw, keyName, sourceLanguageTag, targetLanguageTag, service, metadata) } + async { + service to translate( + text = text, + textRaw = textRaw, + keyName = keyName, + sourceLanguageTag = sourceLanguageTag, + targetLanguageTag = targetLanguageTag, + serviceType = service, + metadata = metadata, + isBatch = isBatch + ) + } }.awaitAll().toMap() } } @@ -83,7 +95,8 @@ class MtServiceManager( params.keyName, params.sourceLanguageTag, params.targetLanguageTag, - params.metadata + params.metadata, + params.isBatch ) ) @@ -98,6 +111,21 @@ class MtServiceManager( return translateResult } catch (e: Exception) { + handleSilentFail(params, e) + TranslateResult( + null, + null, + 0, + params.serviceType + ) + } + } + + private fun handleSilentFail(params: TranslationParams, e: Exception) { + val silentFail = !params.isBatch + if (!silentFail) { + throw e + } else { logger.error( """An exception occurred while translating |text "${params.text}" @@ -107,12 +135,6 @@ class MtServiceManager( ) logger.error(e.stackTraceToString()) Sentry.captureException(e) - TranslateResult( - null, - null, - 0, - params.serviceType - ) } } @@ -123,7 +145,8 @@ class MtServiceManager( sourceLanguageTag: String, targetLanguageTag: String, serviceType: MtServiceType, - metadata: Metadata? = null + metadata: Metadata? = null, + isBatch: Boolean ) = TranslationParams( text = text, textRaw = textRaw, @@ -131,7 +154,8 @@ class MtServiceManager( targetLanguageTag = targetLanguageTag, serviceType = serviceType, metadata = metadata, - keyName = keyName + keyName = keyName, + isBatch = isBatch ) private fun getFaked( @@ -167,9 +191,10 @@ class MtServiceManager( sourceLanguageTag: String, targetLanguageTag: String, serviceType: MtServiceType, - metadata: Metadata? = null + metadata: Metadata? = null, + isBatch: Boolean = false ): TranslateResult { - val params = getParams(text, textRaw, keyName, sourceLanguageTag, targetLanguageTag, serviceType, metadata) + val params = getParams(text, textRaw, keyName, sourceLanguageTag, targetLanguageTag, serviceType, metadata, isBatch) return translate(params) } @@ -184,7 +209,8 @@ class MtServiceManager( sourceLanguageTag: String, targetLanguageTags: List, service: MtServiceType, - metadata: Map? = null + metadata: Map? = null, + isBatch: Boolean ): List { return if (!internalProperties.fakeMtProviders) { translateToMultipleTargets( @@ -194,10 +220,11 @@ class MtServiceManager( text = text, sourceLanguageTag = sourceLanguageTag, targetLanguageTags = targetLanguageTags, - metadata = metadata + metadata = metadata, + isBatch = isBatch ) } else targetLanguageTags.map { - getFaked(getParams(text, textRaw, keyName, sourceLanguageTag, it, service, null)) + getFaked(getParams(text, textRaw, keyName, sourceLanguageTag, it, service, null, isBatch)) } } @@ -208,7 +235,8 @@ class MtServiceManager( keyName: String?, sourceLanguageTag: String, targetLanguageTags: List, - metadata: Map? = null + metadata: Map? = null, + isBatch: Boolean ): List { return runBlocking(Dispatchers.IO) { targetLanguageTags.map { targetLanguageTag -> @@ -220,7 +248,8 @@ class MtServiceManager( sourceLanguageTag, targetLanguageTag, serviceType, - metadata?.get(targetLanguageTag) + metadata?.get(targetLanguageTag), + isBatch ) } }.awaitAll() diff --git a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationApiRateLimitException.kt b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationApiRateLimitException.kt new file mode 100644 index 0000000000..08ef8d9b1b --- /dev/null +++ b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationApiRateLimitException.kt @@ -0,0 +1,4 @@ +package io.tolgee.component.machineTranslation + +class TranslationApiRateLimitException(val retryAt: Long, cause: Throwable) : + RuntimeException("Translation API rate limit exceeded", cause) diff --git a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationParams.kt b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationParams.kt index d6048d775a..af8bfcdb77 100644 --- a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationParams.kt +++ b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/TranslationParams.kt @@ -11,7 +11,8 @@ data class TranslationParams( val sourceLanguageTag: String, val targetLanguageTag: String, val serviceType: MtServiceType, - val metadata: Metadata? + val metadata: Metadata?, + val isBatch: Boolean ) { val cacheKey: String diff --git a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/ProviderTranslateParams.kt b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/ProviderTranslateParams.kt index 490bbb3ce7..1bf415a02b 100644 --- a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/ProviderTranslateParams.kt +++ b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/ProviderTranslateParams.kt @@ -8,5 +8,10 @@ data class ProviderTranslateParams( val keyName: String?, var sourceLanguageTag: String, var targetLanguageTag: String, - val metadata: Metadata? = null + val metadata: Metadata? = null, + + /** + * Whether translation is executed as a part of batch translation task + */ + val isBatch: Boolean, ) diff --git a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslateApiService.kt b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslateApiService.kt index 0e51314702..7f2a3d7a96 100644 --- a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslateApiService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslateApiService.kt @@ -1,8 +1,16 @@ package io.tolgee.component.machineTranslation.providers +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import io.tolgee.component.CurrentDateProvider +import io.tolgee.component.bucket.NotEnoughTokensException +import io.tolgee.component.bucket.TokenBucketManager import io.tolgee.component.machineTranslation.MtValueProvider +import io.tolgee.component.machineTranslation.TranslationApiRateLimitException import io.tolgee.component.machineTranslation.metadata.Metadata import io.tolgee.configuration.tolgee.machineTranslation.TolgeeMachineTranslationProperties +import io.tolgee.util.Logging +import io.tolgee.util.logger import org.springframework.beans.factory.config.ConfigurableBeanFactory import org.springframework.context.annotation.Scope import org.springframework.http.HttpEntity @@ -10,16 +18,23 @@ import org.springframework.http.HttpHeaders import org.springframework.http.HttpMethod import org.springframework.http.ResponseEntity import org.springframework.stereotype.Component +import org.springframework.web.client.HttpClientErrorException import org.springframework.web.client.RestTemplate import org.springframework.web.client.exchange +import java.time.Duration +import kotlin.time.ExperimentalTime +import kotlin.time.measureTimedValue @Component @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON) class TolgeeTranslateApiService( private val tolgeeMachineTranslationProperties: TolgeeMachineTranslationProperties, - private val restTemplate: RestTemplate -) { + private val restTemplate: RestTemplate, + private val tokenBucketManager: TokenBucketManager, + private val currentDateProvider: CurrentDateProvider +) : Logging { + @OptIn(ExperimentalTime::class) fun translate(params: TolgeeTranslateParams): MtValueProvider.MtResult { val headers = HttpHeaders() headers.add("Something", null) @@ -34,32 +49,120 @@ class TolgeeTranslateApiService( params.sourceTag, params.targetTag, examples, - closeItems + closeItems, + priority = if (params.isBatch) "low" else "high" ) val request = HttpEntity(requestBody, headers) - val response: ResponseEntity = restTemplate.exchange( - "${tolgeeMachineTranslationProperties.url}/api/openai/translate", - HttpMethod.POST, - request - ) + consumeRateLimitTokens(params) + + val response: ResponseEntity = try { + val (value, time) = measureTimedValue { + restTemplate.exchange( + "${tolgeeMachineTranslationProperties.url}/api/openai/translate", + HttpMethod.POST, + request + ) + } + logger.debug("Translator request took ${time.inWholeMilliseconds} ms") + value + } catch (e: HttpClientErrorException.TooManyRequests) { + val data = e.parse() + tokenBucketManager.addTokens(TOKEN_BUCKET_KEY, tolgeeMachineTranslationProperties.tokensToPreConsume) + syncBuckets(data) + val waitTime = data.retryAfter ?: 0 + logger.debug("Translator thrown TooManyRequests exception. Waiting for ${waitTime}s") + throw TranslationApiRateLimitException(currentDateProvider.date.time + (waitTime * 1000), e) + } val costString = response.headers.get("Mt-Credits-Cost")?.singleOrNull() ?: throw IllegalStateException("No valid Credits-Cost header in response") val cost = costString.toInt() + finalizeTokenCost(params.isBatch, cost) + return MtValueProvider.MtResult( response.body?.output ?: throw RuntimeException(response.toString()), - cost, + cost * 10, response.body?.contextDescription, ) } + private fun finalizeTokenCost(isBatch: Boolean, cost: Int) { + if (!isBatch) { + return + } + tokenBucketManager.addTokens( + TOKEN_BUCKET_KEY, + tolgeeMachineTranslationProperties.tokensToPreConsume - cost + ) + } + + private fun consumeRateLimitTokens(params: TolgeeTranslateParams) { + if (!params.isBatch) { + return + } + + try { + tokenBucketManager.consume( + TOKEN_BUCKET_KEY, + tolgeeMachineTranslationProperties.tokensToPreConsume, + getTokensRateLimitTokensPerInterval(), + BUCKET_INTERVAL + ) + } catch (e: NotEnoughTokensException) { + logger.debug( + "Not enough token rate limit tokens to translate. " + + "Tokens will be refilled at ${Duration.ofMillis(e.refillAt - currentDateProvider.date.time).seconds}s" + ) + throw TranslationApiRateLimitException(e.refillAt, e) + } + + try { + tokenBucketManager.consume( + CALL_BUCKET_KEY, + 1, + getCallRateLimitTokensPerInterval(), + BUCKET_INTERVAL + ) + } catch (e: NotEnoughTokensException) { + logger.debug( + "Not enough call rate limit tokens to translate. " + + "Tokens will be refilled at ${Duration.ofMillis(e.refillAt - currentDateProvider.date.time).seconds}s" + ) + throw TranslationApiRateLimitException(e.refillAt, e) + } + } + + private fun syncBuckets(data: TooManyRequestsData) { + val retryAfter = data.retryAfter ?: return + val bucketKey = when (data.rateLimit) { + "token" -> TOKEN_BUCKET_KEY + "call" -> CALL_BUCKET_KEY + else -> return + } + + tokenBucketManager.setEmptyUntil(bucketKey, currentDateProvider.date.time + retryAfter * 1000) + } + + private fun getTokensRateLimitTokensPerInterval(): Long { + return tolgeeMachineTranslationProperties.batchMaxTokensPerMinute * BUCKET_INTERVAL.seconds / 60 + } + + private fun getCallRateLimitTokensPerInterval(): Long { + return tolgeeMachineTranslationProperties.batchMaxCallsPerMinute * BUCKET_INTERVAL.seconds / 60 + } + /** * Data structure for mapping the AzureCognitive JSON response objects. */ companion object { + private const val TOKEN_BUCKET_KEY = "tolgee-translate-token-rate-limit" + private const val CALL_BUCKET_KEY = "tolgee-translate-call-rate-limit" + + private val BUCKET_INTERVAL = Duration.ofMinutes(1) + class TolgeeTranslateRequest( val input: String, val keyName: String?, @@ -67,6 +170,7 @@ class TolgeeTranslateApiService( val target: String?, val examples: List?, val closeItems: List?, + val priority: String = "low" ) class TolgeeTranslateParams( @@ -74,7 +178,8 @@ class TolgeeTranslateApiService( val keyName: String?, val sourceTag: String, val targetTag: String, - val metadata: Metadata? + val metadata: Metadata?, + val isBatch: Boolean ) class TolgeeTranslateExample( @@ -85,4 +190,14 @@ class TolgeeTranslateApiService( class TolgeeTranslateResponse(val output: String, val contextDescription: String?) } + + class TooManyRequestsData( + val error: String? = null, + val retryAfter: Int? = null, + val rateLimit: String? = null + ) +} + +private fun HttpClientErrorException.TooManyRequests.parse(): TolgeeTranslateApiService.TooManyRequestsData { + return jacksonObjectMapper().readValue(this.responseBodyAsString) } diff --git a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslationProvider.kt b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslationProvider.kt index 42dd97abac..f0c2c93f16 100644 --- a/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslationProvider.kt +++ b/backend/data/src/main/kotlin/io/tolgee/component/machineTranslation/providers/TolgeeTranslationProvider.kt @@ -24,7 +24,8 @@ class TolgeeTranslationProvider( params.keyName, params.sourceLanguageTag, params.targetLanguageTag, - params.metadataOrThrow() + params.metadataOrThrow(), + params.isBatch ) ) } diff --git a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/machineTranslation/TolgeeMachineTranslationProperties.kt b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/machineTranslation/TolgeeMachineTranslationProperties.kt index 821dc0b792..c4bc91f4a0 100644 --- a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/machineTranslation/TolgeeMachineTranslationProperties.kt +++ b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/machineTranslation/TolgeeMachineTranslationProperties.kt @@ -6,5 +6,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties open class TolgeeMachineTranslationProperties( override var defaultEnabled: Boolean = true, override var defaultPrimary: Boolean = false, - var url: String? = null + var url: String? = null, + var batchMaxTokensPerMinute: Long = 100000, + var batchMaxCallsPerMinute: Long = 500, + var tokensToPreConsume: Long = 1000 ) : MachineTranslationServiceProperties diff --git a/backend/data/src/main/kotlin/io/tolgee/constants/Message.kt b/backend/data/src/main/kotlin/io/tolgee/constants/Message.kt index fbe201d71b..544a8eca41 100644 --- a/backend/data/src/main/kotlin/io/tolgee/constants/Message.kt +++ b/backend/data/src/main/kotlin/io/tolgee/constants/Message.kt @@ -152,7 +152,8 @@ enum class Message { TRANSLATION_FAILED, BATCH_JOB_NOT_FOUND, KEY_EXISTS_IN_NAMESPACE, - TAG_IS_BLANK + TAG_IS_BLANK, + TRANSLATION_API_RATE_LIMIT ; val code: String diff --git a/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJob.kt b/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJob.kt index 9332739e38..660442ade1 100644 --- a/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJob.kt +++ b/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJob.kt @@ -3,6 +3,7 @@ package io.tolgee.model.batch import com.vladmihalcea.hibernate.type.json.JsonBinaryType import io.tolgee.batch.BatchJobDto import io.tolgee.batch.BatchJobType +import io.tolgee.batch.JobCharacter import io.tolgee.model.Project import io.tolgee.model.StandardAuditModel import io.tolgee.model.UserAccount @@ -31,7 +32,7 @@ class BatchJob : StandardAuditModel(), IBatchJob { var author: UserAccount? = null @Type(type = "jsonb") - var target: List = listOf() + var target: List = listOf() var totalItems: Int = 0 @@ -53,10 +54,15 @@ class BatchJob : StandardAuditModel(), IBatchJob { val chunkedTarget get() = chunkTarget(chunkSize, target) + var maxPerJobConcurrency: Int = -1 + + @Enumerated(STRING) + var jobCharacter: JobCharacter = JobCharacter.FAST + val dto get() = BatchJobDto.fromEntity(this) companion object { - fun chunkTarget(chunkSize: Int, target: List): List> = + fun chunkTarget(chunkSize: Int, target: List): List> = if (chunkSize == 0) listOf(target) else target.chunked(chunkSize) } } diff --git a/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt b/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt index b5500618a9..d17070e61a 100644 --- a/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt +++ b/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt @@ -39,10 +39,18 @@ class BatchJobChunkExecution : StandardAuditModel() { var chunkNumber: Int = 0 @Type(type = "jsonb") - var successTargets: List = listOf() + var successTargets: List = listOf() @Column(columnDefinition = "text") - var exception: String? = null + var stackTrace: String? = null + + /** + * This is used to count allowed retries + * + * When max retries for this error keys are reached, the chunk will be marked as failed + */ + @Column(columnDefinition = "text") + var errorKey: String? = null @Enumerated(EnumType.STRING) var errorMessage: Message? = null diff --git a/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt b/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt index 0a334cd7bd..486b6d2522 100644 --- a/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt +++ b/backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt @@ -4,6 +4,7 @@ enum class BatchJobChunkExecutionStatus( val completed: Boolean ) { PENDING(false), + RUNNING(false), SUCCESS(true), FAILED(true), CANCELLED(true), diff --git a/backend/data/src/main/kotlin/io/tolgee/model/batch/params/SetKeysNamespaceParams.kt b/backend/data/src/main/kotlin/io/tolgee/model/batch/params/SetKeysNamespaceParams.kt index 86bd3dce2a..b44c680b99 100644 --- a/backend/data/src/main/kotlin/io/tolgee/model/batch/params/SetKeysNamespaceParams.kt +++ b/backend/data/src/main/kotlin/io/tolgee/model/batch/params/SetKeysNamespaceParams.kt @@ -1,5 +1,5 @@ package io.tolgee.model.batch.params class SetKeysNamespaceParams { - var namespace: String = "" + var namespace: String? = null } diff --git a/backend/data/src/main/kotlin/io/tolgee/repository/TagRepository.kt b/backend/data/src/main/kotlin/io/tolgee/repository/TagRepository.kt index 7f4671a442..05f579188e 100644 --- a/backend/data/src/main/kotlin/io/tolgee/repository/TagRepository.kt +++ b/backend/data/src/main/kotlin/io/tolgee/repository/TagRepository.kt @@ -7,6 +7,7 @@ import io.tolgee.model.key.Tag import org.springframework.data.domain.Page import org.springframework.data.domain.Pageable import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying import org.springframework.data.jpa.repository.Query interface TagRepository : JpaRepository { @@ -56,4 +57,15 @@ interface TagRepository : JpaRepository { ) fun getTagsWithKeyMetas(tagIds: Iterable): List fun findAllByProjectId(projectId: Long): List + + @Modifying(flushAutomatically = true) + @Query( + """ + delete from Tag t + where t.id in + (select tag.id from Tag tag left join tag.keyMetas km group by tag.id having count(km) = 0) + and t.project.id = :projectId + """ + ) + fun deleteAllUnused(projectId: Long) } diff --git a/backend/data/src/main/kotlin/io/tolgee/service/key/KeyService.kt b/backend/data/src/main/kotlin/io/tolgee/service/key/KeyService.kt index b9651f8670..e0c0771518 100644 --- a/backend/data/src/main/kotlin/io/tolgee/service/key/KeyService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/service/key/KeyService.kt @@ -169,7 +169,18 @@ class KeyService( val projectId = keys.map { it.project.id }.distinct().singleOrNull() ?: return val namespaceEntity = namespaceService.findOrCreate(namespace, projectId) - keys.forEach { it.namespace = namespaceEntity } + val oldNamespaces = keys.map { + val oldNamespace = it.namespace + it.namespace = namespaceEntity + oldNamespace + } + + val modifiedNamespaces = oldNamespaces + .filter { it?.name != namespace } + .filterNotNull() + .distinctBy { it.id } + + namespaceService.deleteIfUnused(modifiedNamespaces) keyRepository.saveAll(keys) } diff --git a/backend/data/src/main/kotlin/io/tolgee/service/key/NamespaceService.kt b/backend/data/src/main/kotlin/io/tolgee/service/key/NamespaceService.kt index 2df9d58f13..55f07da5a1 100644 --- a/backend/data/src/main/kotlin/io/tolgee/service/key/NamespaceService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/service/key/NamespaceService.kt @@ -20,7 +20,11 @@ class NamespaceService( ) { private fun getKeysInNamespaceCount(namespace: Namespace?): Long? { namespace ?: return null - return namespaceRepository.getKeysInNamespaceCount(listOf(namespace.id)).firstOrNull()?.get(1) ?: 0 + return getKeysInNamespaceCount(listOf(namespace.id)).values.firstOrNull() + } + + private fun getKeysInNamespaceCount(namespaceIds: List): Map { + return namespaceRepository.getKeysInNamespaceCount(namespaceIds).associate { it[0] to it[1] } } fun deleteUnusedNamespaces(namespaces: List) { @@ -47,9 +51,15 @@ class NamespaceService( fun deleteIfUnused(namespace: Namespace?) { namespace ?: return - val count = getKeysInNamespaceCount(namespace) - if (count == 0L) { - delete(namespace) + deleteIfUnused(listOf(namespace)) + } + + fun deleteIfUnused(namespaces: List) { + val counts = getKeysInNamespaceCount(namespaces.map { it.id }) + namespaces.forEach { + if (counts[it.id] == 0L || counts[it.id] == null) { + delete(it) + } } } diff --git a/backend/data/src/main/kotlin/io/tolgee/service/key/TagService.kt b/backend/data/src/main/kotlin/io/tolgee/service/key/TagService.kt index 876a50f95d..8cf2b9df2f 100644 --- a/backend/data/src/main/kotlin/io/tolgee/service/key/TagService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/service/key/TagService.kt @@ -121,6 +121,14 @@ class TagService( keyMetaService.save(keyMeta) } } + + keysWithFetchedTags.map { it.project.id }.forEach { + this.removeUnusedTags(it) + } + } + + private fun removeUnusedTags(projectId: Long) { + tagRepository.deleteAllUnused(projectId) } private fun getSingleProjectId(keysWithTags: Map): Long { diff --git a/backend/data/src/main/kotlin/io/tolgee/service/machineTranslation/MtService.kt b/backend/data/src/main/kotlin/io/tolgee/service/machineTranslation/MtService.kt index 2682e4c57b..02856addb0 100644 --- a/backend/data/src/main/kotlin/io/tolgee/service/machineTranslation/MtService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/service/machineTranslation/MtService.kt @@ -73,12 +73,19 @@ class MtService( ) } - fun getPrimaryMachineTranslations(key: Key, targetLanguages: List): + fun getPrimaryMachineTranslations(key: Key, targetLanguages: List, isBatch: Boolean): List { val baseLanguage = projectService.getOrCreateBaseLanguage(key.project.id)!! val baseTranslationText = translationService.find(key, baseLanguage).orElse(null)?.text ?: return targetLanguages.map { null } - return getPrimaryMachineTranslations(key.project, baseTranslationText, key.id, baseLanguage, targetLanguages) + return getPrimaryMachineTranslations( + key.project, + baseTranslationText, + key.id, + baseLanguage, + targetLanguages, + isBatch + ) } private fun getPrimaryMachineTranslations( @@ -86,7 +93,8 @@ class MtService( baseTranslationText: String, keyId: Long?, baseLanguage: Language, - targetLanguages: List + targetLanguages: List, + isBatch: Boolean ): List { publishBeforeEvent(project) @@ -119,7 +127,8 @@ class MtService( baseLanguage.tag, languageIdxPairs.map { it.second.tag }, service, - metadata = metadata + metadata = metadata, + isBatch = isBatch ) val withReplacedParams = translateResults.map { translateResult -> @@ -171,7 +180,8 @@ class MtService( sourceLanguageTag = baseLanguage.tag, targetLanguageTag = targetLanguage.tag, services = servicesToUse, - metadata = metadata + metadata = metadata, + isBatch = false ) val actualPrice = results.entries.sumOf { it.value.actualPrice } diff --git a/backend/data/src/main/kotlin/io/tolgee/service/translation/AutoTranslationService.kt b/backend/data/src/main/kotlin/io/tolgee/service/translation/AutoTranslationService.kt index 6c949f0b06..9af4f07325 100644 --- a/backend/data/src/main/kotlin/io/tolgee/service/translation/AutoTranslationService.kt +++ b/backend/data/src/main/kotlin/io/tolgee/service/translation/AutoTranslationService.kt @@ -28,7 +28,8 @@ class AutoTranslationService( key: Key, languageTags: List? = null, useTranslationMemory: Boolean? = null, - useMachineTranslation: Boolean? = null + useMachineTranslation: Boolean? = null, + isBatch: Boolean, ) { val config = getConfig(key.project) @@ -36,22 +37,23 @@ class AutoTranslationService( autoTranslateUsingTm(key, languageTags?.toSet()) } if (useMachineTranslation ?: config.usingPrimaryMtService) { - autoTranslateUsingMachineTranslation(key, languageTags?.toSet()) + autoTranslateUsingMachineTranslation(key, languageTags?.toSet(), isBatch) } } - private fun autoTranslateUsingMachineTranslation(key: Key, languageTags: Set? = null) { + private fun autoTranslateUsingMachineTranslation(key: Key, languageTags: Set? = null, isBatch: Boolean) { val translations = languageTags?.let { getTranslations(key, languageTags) } ?: getUntranslatedTranslations(key) - autoTranslateUsingMachineTranslation(translations, key) + autoTranslateUsingMachineTranslation(translations, key, isBatch) } private fun autoTranslateUsingMachineTranslation( translations: List, - key: Key + key: Key, + isBatch: Boolean ) { val languages = translations.map { it.language } - mtService.getPrimaryMachineTranslations(key, languages) + mtService.getPrimaryMachineTranslations(key, languages, isBatch) .zip(translations) .asSequence() .forEach { (translateResult, translation) -> diff --git a/backend/data/src/main/resources/db/changelog/schema.xml b/backend/data/src/main/resources/db/changelog/schema.xml index 09a3371bcf..79fcc47a54 100644 --- a/backend/data/src/main/resources/db/changelog/schema.xml +++ b/backend/data/src/main/resources/db/changelog/schema.xml @@ -2647,4 +2647,24 @@ + + + + + + + + + + + + + + + + + + + +