Skip to content

Commit

Permalink
feat: 支持查询相似路径 TencentBlueKing#1949
Browse files Browse the repository at this point in the history
* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持查询相似路径 TencentBlueKing#1949

* feat: 支持根据历史访问记录计算下次加载时间 TencentBlueKing#1949

* feat: 移除流水线仓库路径的pid与bid TencentBlueKing#1949

* feat: 移除流水线仓库路径的bid TencentBlueKing#1949

* feat: 判断是否有相似路径被访问后根据配置生成预加载计划 TencentBlueKing#1949

* feat: 判断是否有相似路径被访问后根据配置生成预加载计划 TencentBlueKing#1949

* feat: 增量生成向量化数据 TencentBlueKing#1949

* feat: 判断是否有相似路径被访问后根据配置生成预加载计划 TencentBlueKing#1949

* feat: 判断是否有相似路径被访问后根据配置生成预加载计划 TencentBlueKing#1949

* feat: 判断是否有相似路径被访问后根据配置生成预加载计划 TencentBlueKing#1949

* feat: 支持创建预加载策略时不设置cron,增加相似路径预加载计划生成日志 TencentBlueKing#1949

* feat: 支持创建INTELLIGENT类型的预加载策略 TencentBlueKing#1949

* feat: 批量向量化并耗时日志 TencentBlueKing#1949

* feat: 批量向量化并耗时日志 TencentBlueKing#1949

* feat: 修复milvus配置失败 TencentBlueKing#1949

* feat: 修复milvus配置失败 TencentBlueKing#1949

* feat: 支持配置模型维度 TencentBlueKing#1949

* feat: 支持模拟预加载 TencentBlueKing#1949
  • Loading branch information
cnlkl authored Jul 31, 2024
1 parent 6a37309 commit 3a9b478
Show file tree
Hide file tree
Showing 26 changed files with 1,327 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/backend/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ allprojects {
dependency("commons-io:commons-io:${Versions.CommonsIO}")
dependency("com.squareup.okhttp3:okhttp:${Versions.OKhttp}")
dependency("com.google.guava:guava:${Versions.Guava}")
dependency("com.google.protobuf:protobuf-java:${Versions.ProtobufJava}")
dependency("com.google.protobuf:protobuf-java-util:${Versions.ProtobufJava}")
dependency("com.tencent.polaris:polaris-discovery-factory:${Versions.Polaris}")
dependency("org.apache.commons:commons-text:${Versions.CommonsText}")
Expand Down
3 changes: 2 additions & 1 deletion src/backend/buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Versions {
const val Redline = "1.2.10"
const val SkyWalkingApmToolkit = "8.10.0"
const val Gson = "2.9.0"
const val ProtobufJava = "3.19.4"
const val ProtobufJava = "3.24.0"
const val Guava = "31.1-jre"
const val Shedlock = "4.12.0"
const val JGit = "5.11.0.202103091610-r"
Expand Down Expand Up @@ -70,4 +70,5 @@ object Versions {
const val JavaCpp = "1.5.9"
const val Notice = "1.0.0"
const val SpringCloudFunction = "3.2.11"
const val Milvus = "2.4.1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,20 @@ data class ArtifactPreloadProperties(
* 允许同时预加载的制品个数
*/
var preloadConcurrency: Int = 8,
/**
* 预加载策略未配置时间时使用的预加载时间,取值范围[0,24],可配置多个,将选择离当前时间最近的一个作为预加载时间
*/
var preloadHourOfDay: Set<Int> = emptySet(),
/**
* 减去随机时间,避免同时加载过多文件
*/
var maxRandomSeconds: Long = 600L,
/**
* 根据sha256查询到的node数量超过该值时将不生成预加载计划,避免预加载计划创建时间过久
*/
var maxNodes: Int = 10,
/**
* 是否仅模拟预加载,为true时不执行加载计划,仅输出一条日志
*/
var mock: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ data class ArtifactPreloadStrategyCreateRequest(
@ApiModelProperty("限制只对最近一段时间内创建的制品执行预加载")
val recentSeconds: Long,
@ApiModelProperty("预加载执行时间")
val preloadCron: String,
val preloadCron: String? = null,
@ApiModelProperty("策略类型")
val type: String = PreloadStrategyType.CUSTOM.name,
@ApiModelProperty("操作人")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ enum class PreloadStrategyType {
CUSTOM_GENERATED,

/**
* 智能预加载策略,通过BkBase中的模型预测预加载时间
* 智能预加载策略
*/
INTELLIGENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ class ArtifactPreloadPlanServiceImpl(
if (!properties.enabled) {
return
}
val option = NodeListOption(pageSize = MAX_PAGE_SIZE, includeFolder = false)
val option = NodeListOption(pageSize = properties.maxNodes, includeFolder = false)
val res = nodeClient.listPageNodeBySha256(sha256, option)
val nodes = res.data?.records ?: return
if (nodes.size >= MAX_PAGE_SIZE) {
if (nodes.size >= properties.maxNodes) {
// 限制查询出来的最大node数量,避免预加载计划创建时间过久
logger.warn("nodes of sha256[$sha256] exceed max page size[$MAX_PAGE_SIZE]")
logger.warn("nodes of sha256[$sha256] exceed max page size[${properties.maxNodes}]")
return
}
// node属于同一项目仓库的概率较大,缓存避免频繁查询策略
Expand Down Expand Up @@ -179,6 +179,5 @@ class ArtifactPreloadPlanServiceImpl(
companion object {
private val logger = LoggerFactory.getLogger(ArtifactPreloadPlanServiceImpl::class.java)
private const val REPO_ID_DELIMITERS = "/"
private const val MAX_PAGE_SIZE = 1000
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,7 @@ class DefaultPreloadPlanExecutor(
if (System.currentTimeMillis() - plan.executeTime > preloadProperties.planTimeout.toMillis()) {
throw RuntimeException("plan timeout[${plan.executeTime}], ${plan.artifactInfo()}")
}
val cacheFile = Paths.get(credentials.cache.path, fileLocator.locate(plan.sha256), plan.sha256)
val cacheFileLock = Paths.get(credentials.cache.path, StringPool.TEMP, "${plan.sha256}.locked")
val throughput = if (cacheFile.existReal()) {
Files.setLastModifiedTime(cacheFile, FileTime.fromMillis(System.currentTimeMillis()))
logger.info("cache already exists, update LastModifiedTime, ${plan.artifactInfo()}")
null
} else if (cacheFileLock.existReal()) {
logger.info("cache file is loading, skip preload, ${plan.artifactInfo()}")
null
} else {
// 执行预加载
doLoad(plan.sha256, plan.size, credentials)
}
val throughput = load(plan, credentials)
logger.info("preload success, ${plan.artifactInfo()}, throughput[$throughput]")
listener?.onPreloadSuccess(plan)
} catch (e: Exception) {
Expand All @@ -125,6 +113,27 @@ class DefaultPreloadPlanExecutor(
}
}

private fun load(plan: ArtifactPreloadPlan, credentials: StorageCredentials): Throughput? {
if (preloadProperties.mock) {
logger.info("mock load cache, ${plan.artifactInfo()}")
return null
}

val cacheFile = Paths.get(credentials.cache.path, fileLocator.locate(plan.sha256), plan.sha256)
val cacheFileLock = Paths.get(credentials.cache.path, StringPool.TEMP, "${plan.sha256}.locked")
return if (cacheFile.existReal()) {
Files.setLastModifiedTime(cacheFile, FileTime.fromMillis(System.currentTimeMillis()))
logger.info("cache already exists, update LastModifiedTime, ${plan.artifactInfo()}")
null
} else if (cacheFileLock.existReal()) {
logger.info("cache file is loading, skip preload, ${plan.artifactInfo()}")
null
} else {
// 执行预加载
doLoad(plan.sha256, plan.size, credentials)
}
}

private fun doLoad(sha256: String, size: Long, credentials: StorageCredentials?): Throughput {
return cacheStorageService.load(sha256, Range.full(size), credentials)?.use {
val aisCacheEnabled = it.getMetadata(ArtifactInputStream.METADATA_KEY_CACHE_ENABLED)
Expand Down
11 changes: 11 additions & 0 deletions src/backend/job/biz-job/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ dependencies {
implementation(project(":common:common-operate:operate-service"))
implementation("org.springframework.boot:spring-boot-starter-data-mongodb")
implementation("io.micrometer:micrometer-registry-prometheus")
implementation("io.milvus:milvus-sdk-java:${Versions.Milvus}") {
exclude(group = "org.slf4j")
exclude(group = "org.apache.logging.log4j")
exclude(group = "org.testcontainers")
exclude(group = "com.azure")
exclude(group = "com.amazonaws")
exclude(group = "io.minio")
exclude(group = "org.apache.hadoop")
exclude(group = "org.apache.parquet")
exclude(group = "com.squareup.okhttp3")
}
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("de.flapdoodle.embed:de.flapdoodle.embed.mongo")
testImplementation("org.mockito.kotlin:mockito-kotlin")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.cache.preload

import com.tencent.bkrepo.auth.constant.PIPELINE
import com.tencent.bkrepo.common.artifact.event.base.EventType
import com.tencent.bkrepo.common.mongo.constant.ID
import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID
import com.tencent.bkrepo.common.mongo.dao.util.sharding.MonthRangeShardingUtils
import com.tencent.bkrepo.common.operate.service.model.TOperateLog
import com.tencent.bkrepo.job.batch.base.DefaultContextJob
import com.tencent.bkrepo.job.batch.base.JobContext
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.AiProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.Document
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.EmbeddingModel
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.MilvusVectorStoreProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.VectorStore
import com.tencent.bkrepo.job.config.properties.ArtifactAccessLogEmbeddingJobProperties
import io.milvus.client.MilvusClient
import org.bson.types.ObjectId
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.data.domain.Sort
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.find
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.stereotype.Component
import java.time.Duration
import java.time.LocalDate
import java.time.LocalDateTime
import kotlin.system.measureTimeMillis

@Component
@EnableConfigurationProperties(ArtifactAccessLogEmbeddingJobProperties::class)
@ConditionalOnProperty("job.artifact-access-log-embedding.enabled")
class ArtifactAccessLogEmbeddingJob(
private val aiProperties: AiProperties,
private val properties: ArtifactAccessLogEmbeddingJobProperties,
private val mongoTemplate: MongoTemplate,
private val milvusClient: MilvusClient,
private val embeddingModel: EmbeddingModel,
) : DefaultContextJob(properties) {

override fun getLockAtMostFor(): Duration = Duration.ofDays(7L)

override fun doStart0(jobContext: JobContext) {
val lastMonthVectorStore = createVectorStore(1L)
val curMonthVectorStore = createVectorStore(0L)
var lastMonthCollectionExists = lastMonthVectorStore.collectionExists()
val curMonthCollectionExists = curMonthVectorStore.collectionExists()

if (lastMonthCollectionExists && !curMonthCollectionExists) {
// 可能由于数据生成过程被中断导致存在上月数据不存在当月数据,需要删除重新生成
lastMonthVectorStore.dropCollection()
lastMonthCollectionExists = false
}

if (!lastMonthCollectionExists) {
// 上个月的数据不存在时,使用上个月的访问记录生成数据
logger.info("collection[${lastMonthVectorStore.collectionName()}] not exists, try to create")
lastMonthVectorStore.createCollection()
lastMonthVectorStore.findAccessLogAndInsert(1L)
logger.info("insert data into collection[${lastMonthVectorStore.collectionName()}] success")
}

if (!curMonthCollectionExists) {
// 当月数据不存在时候,使用月初至今的访问记录生成数据
logger.info("collection[${curMonthVectorStore.collectionName()}] not exists, try to create")
curMonthVectorStore.createCollection()
curMonthVectorStore.findAccessLogAndInsert(0L, before = LocalDate.now().atStartOfDay())
} else {
// 已有数据,使用昨日数据生成记录
logger.info("collection[${curMonthVectorStore.collectionName()}] exists, insert data of last day")
val startOfToday = LocalDate.now().atStartOfDay()
val startOfLastDay = LocalDate.now().minusDays(1L).atStartOfDay()
curMonthVectorStore.findAccessLogAndInsert(0L, after = startOfLastDay, before = startOfToday)
}
logger.info("insert data into collection[${curMonthVectorStore.collectionName()}] success")

// 删除过期数据
val deprecatedVectorStore = createVectorStore(2L)
if (deprecatedVectorStore.collectionExists()) {
logger.info("deprecated collection[${deprecatedVectorStore.collectionName()}] exists")
deprecatedVectorStore.dropCollection()
logger.info("drop collection [${deprecatedVectorStore.collectionName()}] success")
}
}

/**
* 获取访问记录并写入向量数据库
*/
private fun VectorStore.findAccessLogAndInsert(
minusMonth: Long,
after: LocalDateTime? = null,
before: LocalDateTime? = null
) {
properties.projects.forEach { projectId ->
processDataBatch(projectId, minusMonth, after, before) { paths ->
val documents = paths.map { Document(content = it, metadata = emptyMap()) }
val elapsed = measureTimeMillis { insert(documents) }
logger.info("[$projectId] insert ${documents.size} data into [${collectionName()}] in $elapsed ms")
}
}
}

private fun createVectorStore(minusMonth: Long): VectorStore {
val seq = MonthRangeShardingUtils.shardingSequenceFor(LocalDateTime.now().minusMonths(minusMonth), 1)
val collectionName = "${aiProperties.collectionPrefix}$seq"

val config = MilvusVectorStoreProperties(
databaseName = aiProperties.databaseName,
collectionName = collectionName,
embeddingDimension = embeddingModel.dimensions(),
)
return MilvusVectorStore(config, milvusClient, embeddingModel)
}

/**
* 获取有访问记录的路径
*/
private fun processDataBatch(
projectId: String,
minusMonth: Long,
after: LocalDateTime?,
before: LocalDateTime?,
handler: (paths: Set<String>) -> Unit,
) {
val collectionName = collectionName(minusMonth)
if (!mongoTemplate.collectionExists(collectionName)) {
logger.warn("mongo collection[$collectionName] not exists")
return
}
val pageSize = properties.batchSize
var lastId = ObjectId(MIN_OBJECT_ID)
var querySize: Int
val criteria = buildCriteria(projectId, after, before)
do {
val query = Query(criteria)
.addCriteria(Criteria.where(ID).gt(lastId))
.limit(pageSize)
.with(Sort.by(ID).ascending())
query.fields().include(
TOperateLog::repoName.name,
TOperateLog::resourceKey.name,
TOperateLog::createdDate.name
)
val data = mongoTemplate.find<Map<String, Any?>>(query, collectionName)
if (data.isEmpty()) {
break
}
// 记录制品访问时间
val accessPaths = data.mapTo(HashSet(pageSize)) {
val repoName = it[TOperateLog::repoName.name] as String
val fullPath = it[TOperateLog::resourceKey.name] as String
val projectRepoFullPath = if (repoName == PIPELINE) {
// 流水线仓库路径/p-xxx/b-xxx/xxx中的构建id不参与相似度计算
val secondSlashIndex = fullPath.indexOf("/", 1)
val pipelinePath = fullPath.substring(0, secondSlashIndex)
val artifactPath = fullPath.substring(fullPath.indexOf("/", secondSlashIndex + 1))
pipelinePath + artifactPath
} else {
fullPath
}
"/$projectId/$repoName$projectRepoFullPath"
}

handler(accessPaths)
querySize = data.size
lastId = data.last()[ID] as ObjectId
} while (querySize == pageSize && shouldRun())
}

private fun collectionName(minusMonth: Long): String {
// 查询上个月的记录
val seq = MonthRangeShardingUtils.shardingSequenceFor(LocalDateTime.now().minusMonths(minusMonth), 1)
return "artifact_oplog_$seq"
}

private fun buildCriteria(projectId: String, after: LocalDateTime?, before: LocalDateTime?): Criteria {
val criteria = Criteria
.where(TOperateLog::projectId.name).isEqualTo(projectId)
.and(TOperateLog::type.name).isEqualTo(EventType.NODE_DOWNLOADED.name)
if (after != null && before != null) {
criteria.and(TOperateLog::createdDate.name).gte(after).lt(before)
} else {
after?.let { criteria.and(TOperateLog::createdDate.name).gte(it) }
before?.let { criteria.and(TOperateLog::createdDate.name).lt(it) }
}
return criteria
}

companion object {
private val logger = LoggerFactory.getLogger(ArtifactAccessLogEmbeddingJob::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.cache
package com.tencent.bkrepo.job.batch.task.cache.preload

import com.tencent.bkrepo.common.artifact.cache.service.impl.ArtifactAccessRecorder
import com.tencent.bkrepo.job.batch.base.DefaultContextJob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.cache
package com.tencent.bkrepo.job.batch.task.cache.preload

import com.tencent.bkrepo.common.artifact.cache.config.ArtifactPreloadProperties
import com.tencent.bkrepo.common.artifact.cache.service.ArtifactPreloadPlanService
Expand Down
Loading

0 comments on commit 3a9b478

Please sign in to comment.