diff --git a/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/credentials/InnerCosCredentials.kt b/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/credentials/InnerCosCredentials.kt index 25a7b15cde..b329f79bd2 100644 --- a/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/credentials/InnerCosCredentials.kt +++ b/src/backend/common/common-storage/storage-api/src/main/kotlin/com/tencent/bkrepo/common/storage/credentials/InnerCosCredentials.kt @@ -71,6 +71,7 @@ data class InnerCosCredentials( var taskInterval: Long = 10, var timeout: Long = 10_000, var minimumPartSize: Long = 10, - var maxDownloadParts: Int = 10000 + var maxDownloadParts: Int = 10000, + var qps: Int = 10, ) } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/ClientConfig.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/ClientConfig.kt index 7e62d78925..d8acde42be 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/ClientConfig.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/ClientConfig.kt @@ -126,6 +126,11 @@ class ClientConfig(private val credentials: InnerCosCredentials) { var timeout: Long = credentials.download.timeout + /** + * 下载分块的qps限速 + * */ + var qps: Int = credentials.download.qps + private fun createEndpointResolver(): EndpointResolver { return if (credentials.modId != null && credentials.cmdId != null) { PolarisEndpointResolver(credentials.modId!!, credentials.cmdId!!) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt index 83ffbf8956..7678b085a6 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt @@ -32,6 +32,7 @@ package com.tencent.bkrepo.common.storage.innercos.client +import com.google.common.util.concurrent.RateLimiter import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.common.api.concurrent.ComparableFutureTask import com.tencent.bkrepo.common.api.concurrent.PriorityCallable @@ -96,6 +97,7 @@ import kotlin.system.measureNanoTime /** * Cos Client */ +@Suppress("UnstableApiUsage") class CosClient(val credentials: InnerCosCredentials) { private val config: ClientConfig = ClientConfig(credentials) @@ -363,9 +365,10 @@ class CosClient(val credentials: InnerCosCredentials) { * */ var i = 0 var priority = System.currentTimeMillis() + val rateLimiter = RateLimiter.create(config.qps.toDouble()) while (factory.hasMoreRequests()) { val downloadPartRequest = factory.nextDownloadPartRequest() - val task = DownloadTask(i, downloadPartRequest, tempRootPath, session, priority) + val task = DownloadTask(i, downloadPartRequest, tempRootPath, session, priority, rateLimiter) val futureTask = ComparableFutureTask(task) executor.execute(futureTask) val futureWrapper = EnhanceFileChunkedFutureWrapper(futureTask) { @@ -457,13 +460,15 @@ class CosClient(val credentials: InnerCosCredentials) { val downloadPartRequest: GetObjectRequest, private val rootPath: Path, private val session: DownloadSession, - private val priority: Long + private val priority: Long, + private val rateLimiter: RateLimiter, ) : PriorityCallable() { override fun call(): File { // 任务可以取消,所以可能会产生中断异常,如果调用方获取结果,则可以捕获异常。 // 但是通常是由于调用方异常而提前终止相关任务,所以这里产生的中断异常大部分情况下无影响。 retry(RETRY_COUNT) { + rateLimiter.acquire() session.activeCount.incrementAndGet() // 为防止重试导致文件名重复 val fileName = "$DOWNLOADING_CHUNkED_PREFIX${seq}_${priority}_${it}$DOWNLOADING_CHUNkED_SUFFIX"