Skip to content

Commit

Permalink
impr: 优化并发和单元测试 TencentBlueKing#2161
Browse files Browse the repository at this point in the history
* impr: 并发解压报错,优化代码 TencentBlueKing#2123

* impr: 增加任务执行超时 TencentBlueKing#2123

* impr: 修复单元测试 TencentBlueKing#2123

* impr: 修复单元测试 TencentBlueKing#2123
  • Loading branch information
felixncheng authored May 23, 2024
1 parent f7ebf6c commit 1395864
Show file tree
Hide file tree
Showing 26 changed files with 518 additions and 311 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.tencent.bkrepo.archive.config

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.archive.core.FileStorageFileProvider
import com.tencent.bkrepo.archive.core.PriorityFileProvider
import com.tencent.bkrepo.archive.core.provider.FileStorageFileProvider
import com.tencent.bkrepo.archive.core.provider.PriorityFileProvider
import com.tencent.bkrepo.archive.utils.ArchiveUtils
import com.tencent.bkrepo.common.security.http.core.HttpAuthSecurity
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data class GcProperties(
var path: String = System.getProperty("java.io.tmpdir"),
var diffThreads: Int = 1, // 文件差分:CPU 内存 IO
var ratio: Float = 0.5f, // 重复率阈值
var signFileCacheTime: Duration = Duration.ofHours(6), // 签名文件缓存事件
var cacheExpireTime: Duration = Duration.ofHours(6), // 文件缓存时间
var bigChecksumFileThreshold: DataSize = DataSize.ofMegabytes(100),
var bigFileCompressPoolSize: Int = 1,
var maxConcurrency: Int = 100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeoutException
import java.util.function.Consumer
import java.util.function.Function

Expand Down Expand Up @@ -45,12 +46,14 @@ class ActiveTaskSubscriber<T, R>(
// 尝试获取许可,成功则继续执行
if (semaphore.tryAcquire()) {
accept.apply(value.obj)
.onErrorResume { Mono.empty() }
.timeout(Duration.ofDays(TASK_TIMEOUT_OF_DAYS), Mono.error(TimeoutException("task timeout")))
.doOnError { logger.error("Failed to execute task [${value.priority}]", it) }
.doFinally {
logger.info("Finish execute task[${value.priority}]")
semaphore.release()
subscription?.request(1)
}
.onErrorResume { Mono.empty() }
.subscribe()
} else {
// 如果许可已被占用,则执行预先设计的降级
Expand All @@ -61,5 +64,6 @@ class ActiveTaskSubscriber<T, R>(

companion object {
private val logger = LoggerFactory.getLogger(ActiveTaskSubscriber::class.java)
private const val TASK_TIMEOUT_OF_DAYS = 1L // 1day
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ class FileCoreProcessor(
}
logger.info("Request $request files to compress or uncompress.")
val req = request.toInt()
val query = Query.query(where(TCompressFile::status).isEqualTo(CompressStatus.WAIT_TO_UNCOMPRESS))
val criteria = where(TCompressFile::status).isEqualTo(CompressStatus.WAIT_TO_UNCOMPRESS)
.and(TArchiveFile::lastModifiedDate.name).lt(LocalDateTime.now().minusMinutes(UNCOMPRESS_RETRY_MIN_MINUTES))
val query = Query.query(criteria)
.with(Sort.by(Sort.Direction.DESC, TCompressFile::lastModifiedDate.name))
.limit(req)
reactiveMongoTemplate.find(query, TCompressFile::class.java).collectList().zipWhen {
Expand Down Expand Up @@ -223,5 +225,6 @@ class FileCoreProcessor(
companion object {
private val logger = LoggerFactory.getLogger(FileCoreProcessor::class.java)
private const val COS_RESTORE_MIN_MINUTES = 1L
private const val UNCOMPRESS_RETRY_MIN_MINUTES = 1L
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.tencent.bkrepo.archive.core

import com.tencent.bkrepo.common.api.concurrent.PriorityRunnableWrapper
import reactor.core.publisher.Mono
import java.util.concurrent.Executor

fun <T, R> Mono<T>.mapPriority(executor: Executor, seq: Int, mapping: (t: T) -> R): Mono<R> {
return this.flatMap {
createPriorityMono(executor, seq) { mapping(it) }
}
}

fun <R> createPriorityMono(executor: Executor, seq: Int, mapping: () -> R): Mono<R> {
return Mono.create { sink ->
val wrapper = PriorityRunnableWrapper(seq) {
try {
sink.success(mapping())
} catch (e: Exception) {
sink.error(e)
}
}
executor.execute(wrapper)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.archive.ArchiveStatus
import com.tencent.bkrepo.archive.config.ArchiveProperties
import com.tencent.bkrepo.archive.constant.ArchiveStorageClass
import com.tencent.bkrepo.archive.core.provider.FileTask
import com.tencent.bkrepo.archive.event.FileArchivedEvent
import com.tencent.bkrepo.archive.event.FileRestoredEvent
import com.tencent.bkrepo.archive.core.FileProvider
import com.tencent.bkrepo.archive.core.provider.PriorityFileProvider
import com.tencent.bkrepo.archive.core.TaskResult
import com.tencent.bkrepo.archive.model.TArchiveFile
import com.tencent.bkrepo.archive.repository.ArchiveFileDao
Expand All @@ -29,12 +30,13 @@ import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Function

@Component
class ArchiveManager(
private val archiveProperties: ArchiveProperties,
private val fileProvider: FileProvider,
private val fileProvider: PriorityFileProvider,
private val archiveFileDao: ArchiveFileDao,
private val archiveFileRepository: ArchiveFileRepository,
private val storageService: StorageService,
Expand All @@ -54,6 +56,7 @@ class ArchiveManager(
ThreadFactoryBuilder().setNameFormat("archive-worker-%d").build(),
)
private val scheduler = Schedulers.fromExecutor(archiveThreadPool)
private val prioritySeq = AtomicInteger(Int.MIN_VALUE)

init {
if (!Files.exists(compressedPath)) {
Expand Down Expand Up @@ -115,7 +118,8 @@ class ArchiveManager(
Files.createDirectories(dir)
val credentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey)
val begin = System.nanoTime()
val ret = fileProvider.get(sha256, Range.full(file.size), credentials)
val fileTask = FileTask(sha256, Range.full(file.size), credentials)
val ret = fileProvider.get(fileTask)
.publishOn(scheduler)
.flatMap {
val filePath = dir.resolve(sha256)
Expand Down Expand Up @@ -197,7 +201,8 @@ class ArchiveManager(
Files.createDirectories(dir)
val begin = System.nanoTime()
val range = if (file.compressedSize == -1L) Range.FULL_RANGE else Range.full(file.compressedSize)
val ret = fileProvider.get(key, range, credentials)
val fileTask = FileTask(key, range, credentials, prioritySeq.getAndIncrement())
val ret = fileProvider.get(fileTask)
.publishOn(scheduler)
.flatMap {
val archiveFilePath = dir.resolve(key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.tencent.bkrepo.archive.core.compress

import com.tencent.bkrepo.common.api.concurrent.PriorityRunnableWrapper
import com.tencent.bkrepo.archive.core.mapPriority
import com.tencent.bkrepo.common.bksync.file.BDUtils
import com.tencent.bkrepo.common.bksync.transfer.exception.TooLowerReuseRateException
import com.tencent.bkrepo.common.storage.monitor.Throughput
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
Expand All @@ -17,7 +16,6 @@ class BDCompressor(
private val bigFileCompressPool: Executor,
private val bigChecksumFileThreshold: Long,
) {

private val seq = AtomicInteger(0)

/**
Expand All @@ -30,32 +28,21 @@ class BDCompressor(
destKey: String,
workDir: Path,
): Mono<File> {
return Mono.zip(checksumFile, srcFile) { checksum, src ->
compress(src, checksum, srcKey, destKey, workDir)
}.flatMap { it }
return Mono.zip(checksumFile, srcFile).mapPriority(executor, seq.getAndIncrement()) {
compress(it.t2, it.t1, srcKey, destKey, workDir)
}
}

private fun compress(src: File, checksum: File, srcKey: String, destKey: String, workDir: Path): Mono<File> {
return Mono.create {
val wrapper = PriorityRunnableWrapper(seq.getAndIncrement()) {
try {
val start = System.nanoTime()
val file = BDUtils.deltaByChecksumFile(src, checksum, srcKey, destKey, workDir, ratio)
val nanos = System.nanoTime() - start
val throughput = Throughput(src.length(), nanos)
logger.info("Success to bd compress $srcKey,$throughput.")
it.success(file)
} catch (e: TooLowerReuseRateException) {
logger.info("File[$srcKey] duplication rate detected is too low.")
it.error(e)
} catch (e: Exception) {
logger.error("Failed to bd compress $srcKey", e)
it.error(e)
} finally {
src.delete()
}
}
chooseScheduler(checksum.length()).execute(wrapper)
private fun compress(src: File, checksum: File, srcKey: String, destKey: String, workDir: Path): File {
try {
val start = System.nanoTime()
val file = BDUtils.deltaByChecksumFile(src, checksum, srcKey, destKey, workDir, ratio)
val nanos = System.nanoTime() - start
val throughput = Throughput(src.length(), nanos)
logger.info("Success to bd compress $srcKey,$throughput.")
return file
} finally {
src.delete()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.tencent.bkrepo.archive.core.compress

import com.tencent.bkrepo.common.api.concurrent.PriorityRunnableWrapper
import com.tencent.bkrepo.archive.core.mapPriority
import com.tencent.bkrepo.common.bksync.file.BDUtils
import com.tencent.bkrepo.common.storage.monitor.Throughput
import org.slf4j.LoggerFactory
Expand All @@ -13,36 +13,28 @@ import java.util.concurrent.atomic.AtomicInteger
class BDUncompressor(
private val executor: Executor,
) {

private val seq = AtomicInteger(Int.MIN_VALUE)

/**
* 根据源文件和签名文件,压缩成新的bd文件
* */
fun patch(bdFile: Mono<File>, baseFile: Mono<File>, sha256: String, workDir: Path): Mono<File> {
return Mono.zip(bdFile, baseFile) { bd, bsf ->
uncompress(bd, bsf, sha256, workDir)
}.flatMap { it }
return Mono.zip(baseFile, bdFile).mapPriority(executor, seq.getAndIncrement()) {
uncompress(it.t1, it.t2, sha256, workDir)
}
}

private fun uncompress(bdFile: File, baseFile: File, sha256: String, workDir: Path): Mono<File> {
return Mono.create {
val wrapper = PriorityRunnableWrapper(seq.getAndIncrement()) {
try {
val start = System.nanoTime()
val file = BDUtils.patch(bdFile, baseFile, workDir)
val nanos = System.nanoTime() - start
val throughput = Throughput(file.length(), nanos)
logger.info("Success to bd uncompress $sha256,$throughput.")
it.success(file)
} catch (e: Exception) {
logger.error("Failed to bd uncompress $sha256", e)
it.error(e)
} finally {
bdFile.delete()
baseFile.delete()
}
}
executor.execute(wrapper)
private fun uncompress(baseFile: File, bdFile: File, sha256: String, workDir: Path): File {
try {
val start = System.nanoTime()
val file = BDUtils.patch(bdFile, baseFile, workDir)
val nanos = System.nanoTime() - start
val throughput = Throughput(file.length(), nanos)
logger.info("Success to bd uncompress $sha256,$throughput.")
return file
} finally {
bdFile.delete()
}
}

Expand Down
Loading

0 comments on commit 1395864

Please sign in to comment.