Skip to content

Commit

Permalink
bug: 修复大文件下载,导致cos qps限速报错 TencentBlueKing#1363 (TencentBlueKing#1367)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixncheng authored Nov 8, 2023
1 parent 104af41 commit 57da2f6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ data class InnerCosCredentials(
var taskInterval: Long = 10,
var timeout: Long = 10_000,
var minimumPartSize: Long = 10,
var maxDownloadParts: Int = 10000
var maxDownloadParts: Int = 10000,
var qps: Int = 10,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ class ClientConfig(private val credentials: InnerCosCredentials) {

var timeout: Long = credentials.download.timeout

/**
* 下载分块的qps限速
* */
var qps: Int = credentials.download.qps

private fun createEndpointResolver(): EndpointResolver {
return if (credentials.modId != null && credentials.cmdId != null) {
PolarisEndpointResolver(credentials.modId!!, credentials.cmdId!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package com.tencent.bkrepo.common.storage.innercos.client

import com.google.common.util.concurrent.RateLimiter
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.api.concurrent.ComparableFutureTask
import com.tencent.bkrepo.common.api.concurrent.PriorityCallable
Expand Down Expand Up @@ -96,6 +97,7 @@ import kotlin.system.measureNanoTime
/**
* Cos Client
*/
@Suppress("UnstableApiUsage")
class CosClient(val credentials: InnerCosCredentials) {
private val config: ClientConfig = ClientConfig(credentials)

Expand Down Expand Up @@ -363,9 +365,10 @@ class CosClient(val credentials: InnerCosCredentials) {
* */
var i = 0
var priority = System.currentTimeMillis()
val rateLimiter = RateLimiter.create(config.qps.toDouble())
while (factory.hasMoreRequests()) {
val downloadPartRequest = factory.nextDownloadPartRequest()
val task = DownloadTask(i, downloadPartRequest, tempRootPath, session, priority)
val task = DownloadTask(i, downloadPartRequest, tempRootPath, session, priority, rateLimiter)
val futureTask = ComparableFutureTask(task)
executor.execute(futureTask)
val futureWrapper = EnhanceFileChunkedFutureWrapper(futureTask) {
Expand Down Expand Up @@ -457,13 +460,15 @@ class CosClient(val credentials: InnerCosCredentials) {
val downloadPartRequest: GetObjectRequest,
private val rootPath: Path,
private val session: DownloadSession,
private val priority: Long
private val priority: Long,
private val rateLimiter: RateLimiter,
) : PriorityCallable<File, DownloadTask>() {

override fun call(): File {
// 任务可以取消,所以可能会产生中断异常,如果调用方获取结果,则可以捕获异常。
// 但是通常是由于调用方异常而提前终止相关任务,所以这里产生的中断异常大部分情况下无影响。
retry(RETRY_COUNT) {
rateLimiter.acquire()
session.activeCount.incrementAndGet()
// 为防止重试导致文件名重复
val fileName = "$DOWNLOADING_CHUNkED_PREFIX${seq}_${priority}_${it}$DOWNLOADING_CHUNkED_SUFFIX"
Expand Down

0 comments on commit 57da2f6

Please sign in to comment.